How-to: Загружаем и запрашиваем «быстрые» данные с помощью Impala

Система Impala позволяет анализировать данные в Apache Hadoop в режиме реального времени. Поскольку во многих случаях данные поступают в Hadoop непрерывно (например, при анализе временных рядов, выявлении мошенничества или риска в режиме реального времени и т.д.), необходимо, чтобы Impala обеспечивала возможность запрашивать новые «быстрые» данные с минимальной задержкой и без прерывания выполняющихся запросов.

В этой статье мы рассмотрим подход, позволяющий непрерывно загружать данные в Impala посредством HDFS, и продемонстрируем способность Impala запрашивать быстро изменяющиеся данные в режиме реального времени. (Следует отметить, что Kudu, новое хранилище для Hadoop, ориентированное на представление данных в форме столбцов (columnar store), находящееся сейчас на стадии бета-версии, позволяет решать эти задачи более рационально и эффективно. Тем не менее, рассматриваемое нами решение является хорошим аналогом.)

Цели

Как уже было сказано ранее, нашей целью является непрерывная загрузка микропакетов данных в Hadoop и обеспечение доступа к ним для Impala с минимальной задержкой и без прерывания выполняющихся запросов (а также без блокирования новых запросов). Поскольку загрузка происходит непрерывно, логично предположить, что отдельный загружаемый фрагмент данных является малой частью (<10%) от общего объема данных.

В рамках данного решения мы дадим следующие определения понятиям «непрерывно» (continuously) и «минимальная задержка» (minimal delay):

  • Непрерывно: пакеты загружаются с интервалом в одну минуту или более.
  • Минимальная задержка: задержка загрузки пакета + несколько минут.

Подход должен обеспечивать следующие гарантии согласованности:

  • Согласованность в пределах одной ссылки на таблицу: ссылка на таблицу, присутствующая в запросе, «видит» либо весь пакет данных полностью, либо «не видит» новых данных вообще. Ссылка на таблицу не имеет доступ к части пакета. Однако если запрос ссылается на таблицу более одного раза (в частности, в случае самосоединения (self-join)), одна ссылка на таблицу может «видеть» новый пакет данных, а другая – нет. Другими словами, при самосоединении согласованность не гарантируется. Это связано с тем, что ни Impala, ни Apache Hive не используют снимок метаданных (snapshot of metadata) при планировании запроса. Следовательно, изменения в метаданных во время планирования запроса могут привести к этой несогласованности.
  • В пределах одного соединения (сессии), если запрос «видит» новый пакет данных, все последующие запросы также будут его «видеть». Однако эта гарантия не распространяется на все сессии вследствие так называемой «итоговой согласованности» («eventual consistency») метаданных в Impala. (Синхронизацию можно выполнить с помощью параметра запроса SYNC_DDL. Подробности можно найти по ссылке) Следовательно, некоторые запросы могут видеть новые данные, а некоторые – нет, если они приходят из другой сессии.

Обзор решения

Обычно данные могут загружаться непосредственно в HDFS с помощью Apache Flume, Apache Sqoop, Flafka или команды put. (Запуск процесса загрузки данных выходит за рамки данной статьи.) Интервал загрузки менее одной минуты является достижимым. Поскольку каждая загрузка будет создавать несколько маленьких файлов, вследствие высокой частоты загрузок будет создаваться большое количество маленьких файлов, имеющих формат Apache Avro или текстовый формат. Следовательно, периодически должен выполняться процесс уплотнения (compaction), объединяющий маленькие файлы в более крупные и преобразующий их в более эффективный ориентированный на столбцы формат (columnar format) (например, Apache Parquet). Интервал уплотнения (вероятно, несколько часов) зависит от ряда факторов, например, от количества сгенерированных файлов.

Управление параллельно выполняющимися уплотнением, загрузкой и запросами может быть трудной задачей. Процесс уплотнения основан на запросе INSERT, при этом он не должен читать данные из каталога, в который активно выполняется запись (как в случае с приемной таблицей (landing table)), иначе этот каталог необходимо будет заново уплотнять позже. Кроме того, процесс уплотнения не должен писать в активно читаемый каталог (как в случае с приемной таблицей и базовой таблицей (base table)), иначе выполняющийся запрос завершится ошибкой, а загруженные новые данные будут потеряны.

Решением в данной ситуации является использование двух приемных таблиц. В каждый момент времени для записи данных доступна только одна таблица (активная), а для чтения – обе. Процесс уплотнения будет читать из неактивной таблицы, а писать в новый каталог, из которого не происходит чтение. После завершения уплотнения, исходный каталог должен быть удален из неактивной приемной таблицы, а уплотненный каталог должен быть добавлен к базовой таблице.

Поскольку удаление и добавление являются DDL-операциями, а небольшой интервал между двумя такими операциями может привести к несогласованности данных, новая базовая таблица указывает на существующие данные базовой таблицы и уплотненные каталоги, содержащие все данные неактивной приемной таблицы. Затем создается новое представление (view), инкапсулирующее новую таблицу и активную приемную таблицу. Таким образом, выполняющийся запрос будет продолжать читать исходные неуплотненные данные без каких-либо прерываний. Новые запросы будут читать уплотненные данные и не будут «видеть» исходные данные. После завершения всех выполняющихся запросов исходные данные могут быть удалены.

Пример

Рассмотрим пример, иллюстрирующий описанный подход. Вначале создадим схему таблицы store_sales:

Create table store_sales(
              txn_id int,
              item_id int,
              sales double,
              …
) partitioned by (
              sales_date string,
              store_id int
)
stored as parquet
location ‘/user/hive/warehouse/store_sales/’;

Чтобы подготовиться к приему загружаемых данных, создадим приемную таблицу, такую же, как store_sales.

Create store_sales_landing_tbl like store_sales
              location ‘/user/hive/warehouse/store_sales_landing_tbl/’
              stored as avro;

В этом примере приемная таблица имеет формат Avro, но также можно использовать текстовый формат или Parquet. Необходимо убедиться в том, что в HDFS создан каталог, доступный Impala для чтения и записи.

Создадим представление, инкапсулирующее таблицы store_sales и store_sales_landing_tbl.

Create view store_sales_view_1 as select * from store_sales union all store_sales_landing_tbl;

Все запросы будут адресованы к представлению store_sales_view_1.

Загрузка данных (интервал – одна минута)

Загружаем данные непосредственно в store_sales_landing_tbl. Допустим, что мы загружаем данные в два следующих раздела:

/store_sales_landing_tbl/sales_date=20150512/store_id=123
/store_sales_landing_tbl/sales_date=20150512/store_id=256

Если загрузка выполняется без участия Impala (например, с помощью Flume), необходимо добавить разделы для вновь созданного каталога. Команда Hive msck repair store_sales_landing_tbl обнаружит новые каталоги и добавит недостающие разделы. Поскольку Impala кеширует метаданные таблицы (например, список разделов), необходимо обновить метаданные таблицы в Impala с помощью команды refresh store_sales_landing_tbl. Следовательно, если загрузка данных происходит один раз в минуту, команды msck repair и refresh также должны выполняться один раз в минуту. (Обратите внимание, в Impala недавно была добавлена команда alter table recover partitions.

Приемная таблица содержит данные только за один день и не должна иметь более ~500 разделов, поэтому команда msck repair table должна выполниться за несколько секунд. Время, затраченное на выполнение команды msck repair table, пропорционально количеству разделов. Если количество разделов превышает 500, команда по-прежнему будет работать, но потребует больше времени.

Обновление статистики

Как уже отмечалось, поскольку загрузка выполняется с высокой частотой, маловероятно, что одна отдельная загрузка добавит существенную часть (>10%) раздела. Опираясь на это допущение, мы рекомендуем обновлять количество строк каждый час с помощью фонового задания. Чтобы обновить количество строк, необходимо знать текущее количество строк и количество новых строк. Поскольку у нас всего ~500 разделов, процесс загрузки может отслеживать количество строк с помощью тестового файла в HDFS. Однако если процесс загрузки не может сообщить количество новых строк (hdfs put ), необходимо выполнить запрос, чтобы получить количество строк:

select store_id, count(*) from store_sales_landing_tbl 
       where sales_date=20150512 group by store_id;

После того, как мы получили количество строк (например, 7012 для магазина 123 и 3451 для магазина 256), с помощью следующего SQL-оператора вручную обновим количество строк для таблицы и каждого затронутого раздела:

alter table store_sales_landing_tbl partition(sales_date=20150512, store_id=123) set tblproperties(‘numRows=’7012’, STATS_GENERATED_VIA_STATS_TASK’=’true’);
alter table store_sales_landing_tbl partition(sales_date=20150512, store_id=256) set tblproperties(‘numRows=’3451’, STATS_GENERATED_VIA_STATS_TASK’=’true’);

Подготовка к уплотнению

Использование таблицы store_sales_landing_tbl позволяет достичь высокой частоты загрузки. Одним из побочных эффектов высокой частоты загрузки является большое количество маленьких файлов. Уплотнение фалов – стандартный процесс, но в данном случае мы хотим выполнять «онлайн» уплотнение, то есть уплотнять файлы параллельно с выполняющимися запросами и загрузкой данных. Чтобы решить эту задачу, нам понадобится несколько дополнительных представлений и таблиц. На рисунке ниже представлена соответствующая концепция.

Вначале создадим две таблицы store_sales_2 и store_sales_landing_tbl_2. Они должны иметь такую же структуру и метаданные, как store_sales. Таблица store_sales_2 должна содержать в точности те же данные, что и store_sales, указывая на тот же самый каталог данных. Таблица store_sales_landing_tbl_2 исходно пуста.

Ниже представлены соответствующие операторы create table:

В Hive:

Create store_sales_2 like store_sales
      location ‘/user/hive/warehouse/store_sales/’;

Msck repair store_sales_2

(Поскольку store_sales_2 и store_sales оперируют одним каталогом, команда msck repair store_sales_2 добавит все разделы store_sales в store_sales_2.)

В Impala:

	
Invalidate Metadata store_sales_2;

Create store_sales_landing_tbl_2 like store_sales_landing_tbl location ‘/user/hive/warehouse/store_sales_landing_tbl_2/’;

Необходимо убедиться в том, что в HDFS созданы каталоги, доступные Impala для чтения и записи.

Теперь необходимо, чтобы представление store_sales_view_1 инкапсулировало store_sales, store_sales_landing_tbl и store_sales_landing_tbl_2.

create or replace view store_sales_view_1 
      select * from store_sales
      union all
      select * from store_sales_landing_tbl
      union all
      select * from store_sales_landing_tbl_2;

Создадим другое представление store_sales_view_2, указывающее на store_sales_2 и store_sales_landing_tbl_2.

create or replace view store_sales_view_2
      select * from store_sales_2
      union all
      select * from store_sales_landing_tbl_2;

Пользователи запрашивают представление store_sales_view_1 через еще одно представление store_sales_store_sales_view.

create or replace view store_sales_store_sales_view
      select * from store_sales_view_1;
1
2
create or replace view store_sales_store_sales_view
      select * from store_sales_view_1;

Ниже представлено состояние представлений:

store_sales_store_sales_view -> store_sales_view_1
store_sales_view_1 -> store_sales, 
store_sales_landing_tbl, store_sales_landing_tbl_2
store_sales_view_2 -> store_sales_2, store_sales_landing_tbl_2

Уплотнение (интервал – один день)

Направим загрузку данных в таблицу store_sales_landing_tbl_2. Загрузка может продолжаться без прерываний, но только в таблицы с индексом «_2». То есть новая загрузка пишет в /store_sales_landing_tbl_2/, затем выполняется команда msck repair store_sales_landing_tbl_2, а после этого – команда refresh store_sales_landing_tbl_2 в Impala. Прежде чем переходить к следующему шагу, необходимо дождаться завершения последней загрузки в таблицу store_sales_landing_tbl (дождаться завершения последней команды refresh store_sales_landing_tbl).

Скопируем данные из store_sales_landing_tbl в store_sales_2 и уплотним их.

1
INSERT INTO store_sales_2 [shuffle] select * from store_sales_landing_tbl;

Обновим количество строк для каждого модифицированного раздела в store_sales_2. Перечень модифицированных разделов идентичен для таблиц store_sales_2 и store_sales_landing_tbl. Поэтому с помощью команды show partitions store_sales_landing_tbl, мы можем узнать, какие разделы таблицы store_sales_2 модифицированы.

В нашем примере мы имеем два раздела в store_sales_landing_tbl. Поэтому необходимо выполнить следующее:

alter table store_sales_2 partition(sales_date=20150512, store_id=123) set tblproperties(‘numRows=’7012’, STATS_GENERATED_VIA_STATS_TASK’=’true’);
alter table store_sales_2 partition(sales_date=20150512, store_id=256) set tblproperties(‘numRows=’3451’, STATS_GENERATED_VIA_STATS_TASK’=’true’);

Ассоциируем представление store_sales_view с представлением store_sales_view_2.

Create or replace view store_sales_view as select * from store_sales_view_2

Теперь новые приходящие запросы будут «видеть» уплотненные данные, а также данные в таблице store_sales_landing_tbl_2. Все выполняющиеся запросы, не прерываясь, будут продолжать чтение из таблиц, на которые указывает представление store_sales_view_1.

http://blog.cloudera.com/wp-content/uploads/2015/10/impala-loading-f6.png

Далее мы удаляем маленькие файлы из store_sales_landing_tbl, поскольку все запросы, направленные к store_sales_view_1, завершены. Допустим, что самый продолжительный запрос, читающий из store_sales_store_sales_view, займет не более 15 минут. По истечении этого периода, не будет запросов, читающих из представления store_sales_view_1 (т.е. из таблиц store_sales и store_sales_landing_tbl). Удалим исходные маленькие файлы из таблицы store_sales_landing_tbl по истечении 15 минут путем удаления (drop) и повторного создания (create) таблицы store_sales_landing_tbl. Важно выполнить именно удаление и повторное создание (drop/create) таблицы, а не очистку всех строк таблицы с помощью команды truncate, потому что необходимо удалить все разделы.

drop table store_sales_landing_tbl;
Create store_sales_landing_tbl like store_sales
     location ‘/user/hive/warehouse/store_sales_landing_tbl’
     stored as avro;

Ассоциируем таблицу store_sales_landing_tbl с представлением store_sales_view_2.

Create or replace view store_sales_view_2
     select * from store_sales_2 
     union all 
     select * from store_sales_landing_tbl 
     union all 
     select * from store_sales_landing_tbl_2;

Синхронизируем метаданные (список разделов) таблицы store_sales с метаданными таблицы store_sales_2. Каждый раздел, добавленный в store_sales_2, добавим в store_sales и обновим количество строк вновь добавленного раздела.

Мы уже получили перечень модифицированных разделов таблицы store_sales_2 на предыдущем этапе. Используя этот же перечень, выполним следующие команды:

alter table store_sales add partition(sales_date=20150512, store_id=123);
alter table store_sales add partition(sales_date=20150512, store_id=256);
 
alter table store_sales partition(sales_date=20150512, store_id=123) set tblproperties(‘numRows=’7012’, STATS_GENERATED_VIA_STATS_TASK’=’true’); alter table store_sales partition(sales_date=20150512, store_id=256) set tblproperties(‘numRows=’3451’, STATS_GENERATED_VIA_STATS_TASK’=’true’)

Поскольку таблицы store_sales и store_sales_2 оперируют одним каталогом, после этого этапа они будут снова содержать один и тот же набор данных.

Теперь уберем таблицу store_sales_landing_tbl_2 из представления store_sales_view_1.

Create or replace view store_sales_view_1
select * from store_sales
union all 
select * from store_sales_landing_tbl;

Сейчас состояние всех представлений идентично начальному, за исключением того, что произошел обмен с участием таблиц с индексом «_2»! Ниже представлено текущее состояние:

store_sales_view -> store_sales_view_2
store_sales_view_1 -> store_sales, store_sales_landing_tbl
store_sales_view_2 -> store_sales_2, store_sales_landing_tbl_2, store_sales_landing_tbl

Замечания относительно интервалов загрузки и уплотнения

Согласно рекомендации Cloudera, интервал между загрузками не должен быть меньше одной минуты. Более высокая частота может создать чрезмерную нагрузку на хранилище метаданных Hive (metastore) и сервис обновления метаданных Impala (catalog service). Кроме того, вероятно, возрастет частота уплотнения и, соответственно, потребление системных ресурсов.

В общем случае, идеальная частота уплотнения – один раз в день. Однако непосредственно перед уплотнением приемная таблица будет содержать большое количество маленьких файлов, поэтому рекомендуется убедиться в том, что время обработки запросов, пропускная способность и потребление системных ресурсов являются удовлетворительными. В противном случае, уплотнение можно выполнять чаще.

Заключение

В этой статье мы рассмотрели метод непрерывной загрузки микропакетов данных в HDFS для использования в Impala. Эту задачу также можно решить с помощью Kudu, причем более элегантно по сравнению со сложным подходом, описанным в данной статье. Тем не менее, рассмотренное нами решение является достаточно эффективным.

По материалам: Cloudera

Перевод Станислава Петренко 

1 комментарий

  1. Anna:

    Спасибо за перевод! Еще бы поясняющие схемы к тексту, например, взаимодействие между разными системами, было бы вообще здорово

    Ответить

Добавить комментарий

Ваш адрес email не будет опубликован.

закрыть

Поделиться

Отправить на почту
закрыть

Вход

закрыть

Регистрация

+ =