Оптимизация заданий Apache Spark. Часть 1

Изучаем методы оптимизации заданий Apache Spark, позволяющие добиться высокой эффективности.

Когда вы пишете код для Apache Spark и читаете документацию по API, вам встречаются такие термины, как преобразование (transformation), действие (action) и RDD (resilient distributed dataset, эластичный распределенный набор данных). Понимание Spark на данном уровне является жизненно важным для написания программ. Когда ваше приложение не работает, или когда вы обращаетесь к веб-интерфейсу, чтобы выяснить, почему ваше приложение выполняется так долго, вы также сталкиваетесь с новыми терминами, такими как задание (job), этап (stage) и задача (task). Понимание Spark на этом уровне является жизненно важным для написания хороших программ. Безусловно, я подразумеваю, что хорошие программы – это быстрые программы. Чтобы написать эффективную программу для Spark, крайне важно понимать базовую модель выполнения Spark.

Из этой статьи вы узнаете о том, как на самом деле программы Spark выполняются на кластере. Затем вы получите некоторые практические сведения о том, как написание эффективных программ взаимосвязано с моделью выполнения Spark.

Как Spark выполняет вашу программу?

Приложение Spark состоит из одного управляющего процесса, называемого драйвером (driver process), и набора исполняющих процессов, называемых исполнителями (executor process), которые рассредоточены по узлам кластера.

Драйвер обеспечивает высокоуровневое управление рабочим процессом. Исполнители обеспечивают выполнение работы в форме задач (task), а также хранение любых данных, которые пользователь хочет кэшировать. Драйвер и исполнители работают в течение всего процесса выполнения программы, при этом ресурсы для исполнителей выделяются динамически. Каждый исполнитель способен параллельно выполнять несколько задач. Развертывание этих процессов на кластере осуществляется менеджером кластера (YARN, Mesos или Spark Standalone), но драйвер и исполнители присутствуют в каждом приложении Spark.

1

На верхней позиции в иерархии выполнения находятся задания. Вызов действия в приложении Spark инициирует запуск соответствующего задания. Чтобы определить, что собой представляет это задание, Spark анализирует граф RDDs, принимающих участие в этом действии, и формирует план выполнения. Этот план начинается с наиболее старых RDDs (не зависящих от других RDDs или ссылающихся на уже кэшированные данные) и заканчивается на последнем RDD, необходимом для получения результатов действия.

План выполнения объединяет преобразования (transformation) в этапы. Этап представляет собой совокупность задач, выполняющих один и тот же код на разных подмножествах данных. Каждый этап включает в себя последовательность преобразований, которые могут быть выполнены без перемешивания (shuffle) всего набора данных.

Что определяет, должны ли данные перемешиваться? Вспомним о том, что RDD содержит фиксированное количество разделов (partition), каждый из которых содержит некоторое количество записей (record). В случае RDDs, полученных в результате так называемых узких (narrow) преобразований, таких как отображение (map) и фильтрация (filter), записи, необходимые для вычисления записей в одном разделе, находятся в одном разделе в родительском RDD. Каждый объект зависит только от одного объекта в родителе. Такие операции, как coalesce, могут привести к тому, что задача будет работать с несколькими входными разделами, но преобразование по-прежнему будет считаться узким, потому что входные записи, используемые для вычисления любой выходной записи, по-прежнему могут находиться только в ограниченном подмножестве разделов.

Однако Spark также поддерживает преобразования с широкими (wide) зависимостями, такие как groupByKey и reduceByKey. В рамках таких зависимостей, данные, необходимые для вычисления записей в одном разделе, могут находиться в нескольких разделах родительского RDD. Все кортежи (tuple) с одинаковыми ключами (key) должны оказаться в одном разделе, обрабатываемом одной задачей. Чтобы реализовать эти операции, Spark должен выполнить перемешивание, перемещающее данные по кластеру и формирующее новый этап с новым набором разделов.

Для примера рассмотрим следующий код:

Безымянный

Этот код выполняет одно действие, основывающееся на последовательности преобразований RDD, полученного из текстового файла. Код будет выполнен в один этап, потому что ни один из результатов этих трех операций не зависит от данных, которые могут поступать из разделов, отличных от тех, из которых поступают их входные данные.

Для сравнения, следующий код определяет, сколько раз каждый символ встречается во всех словах, встречающихся в текстовом файле более чем 1 000 раз.

Безымянный

Этот процесс будет разделен на три этапа. Операции reduceByKey формируют границы этапов, потому что для вычисления их результатов необходимо заново разделить данные (repartition) на основе ключей.

Ниже представлена более сложная схема преобразования, включающая соединение (join) с множественными зависимостями.

2

Розовые рамки демонстрируют результирующую схему этапов выполнения.

3

На границе каждого этапа данные записываются на диск задачами родительских (parent) этапов, а затем извлекаются через сеть задачами дочернего (child) этапа. Поскольку на границах этапов выполняются ресурсоемкие операции дискового и сетевого ввода/вывода, следует избегать границ этапов, когда это возможно. Количество разделов данных на родительском этапе может отличаться от количества разделов на дочернем этапе. Преобразования, которые могут формировать границу этапа, обычно принимают параметр numPartitions, определяющий, на какое количество разделов необходимо разделить данные на дочернем этапе.

Так же, как количество редукторов (reducer) является важным параметром при настройке заданий MapReduce, настройка количества разделов на границах этапов может существенно повлиять на производительность приложения. В дальнейшем мы глубже разберемся в том, как настроить это количество.

Выбор подходящих операторов

При реализации какой-либо программы для Spark, разработчику обычно доступны различные комплексы действий и преобразований, обеспечивающие одинаковые результаты. Однако не все они обеспечивают одинаковую производительность. Умение избегать распространенных ошибок и выбирать подходящие механизмы может существенно повысить производительность приложения. Несколько правил и рекомендаций помогут вам сделать правильный выбор.

В рамках работы над проблемой SPARK-5097 [ссылка] была начата стабилизация SchemaRDD, в результате чего оптимизатор Spark Catalyst будет открыт для программистов, использующих базовые API Spark. Как следствие, Spark получит возможность осуществлять выбор операторов на более высоком уровне. Когда SchemaRDD станет стабильным компонентом, пользователи будут избавлены от необходимости принимать некоторые из этих решений.

Основная цель при выборе набора операторов – уменьшение количества перемешиваний и объема перемешиваемых данных. Это объясняется тем, что перемешивания являются ресурсоемкими операциями, так как все перемешиваемые данные должны быть записаны на диск, а потом переданы по сети. Преобразования repartition, join, cogroup и любое из преобразований *By или *ByKey могут привести к перемешиваниям. Однако не все эти операции эквивалентны, и некоторые из самых распространенных ошибок, влияющих на производительность, которые допускают начинающие Spark-разработчики, возникают в результате неправильного выбора операции:

  • Не используйте groupByKey, когда выполняете ассоциативную операцию свертки (associative reductive operation). Например, rdd.groupByKey().mapValues(_.sum) даст те же результаты, что и rdd.reduceByKey(_ + _). Тем не менее в первом случае весь набор данных будет передан по сети, а во втором – будут вычислены локальные суммы для каждого ключа в каждом разделе, после чего эти локальные суммы будут объединены в более крупные суммы после перемешивания.
  • Не используйте reduceByKey, когда типы входного и выходного значений различны. Для примера рассмотрим преобразование, которое находит все уникальные строки, соответствующие каждому ключу. В качестве одного из вариантов можно использовать отображение, чтобы преобразовать каждый элемент в Set, а затем объединить все Set с помощью reduceByKey:

  Безымянный

Этот код приводит к созданию массы ненужных объектов, потому что объект Set должен быть создан для каждой записи. Лучше использовать операцию aggregateByKey, выполняющую агрегацию на стороне отображения (map-side) более эффективно:

Безымянный

  • Не используйте шаблон flatMap-join-groupBy. Когда два набора данных уже сгруппированы по ключу, и вы хотите соединить их, сохраняя сгруппированными, можно просто использовать когруппу (cogroup). Такой подход позволяет избежать накладных расходов, связанных с распаковкой (unpack) и повторной упаковкой (repack) групп.

Когда перемешивания не происходят

Также полезно знать о тех случаях, когда представленные выше преобразования не приводят к перемешиваниям. Spark не выполняет перемешивание, если в результате предыдущего преобразования данные уже были разделены в соответствии с тем же разделителем (partitioner).

Рассмотрим следующий код:

Безымянный

Поскольку разделитель не передается в reduceByKey, будет использован разделитель по умолчанию, в результате чего rdd1 и rdd2 будут разделены по хешу (hash-partition). Две операции reduceByKey приведут к двум перемешиваниям. Если RDDs имеют одинаковое количество разделов, тогда соединение не потребует дополнительного перемешивания. Поскольку RDDs разделены идентично, набор ключей в одном разделе rdd1 может появиться только в одном разделе rdd2. Таким образом, содержимое одного выходного раздела rdd3 будет зависеть только от содержимого одного раздела rdd1 и одного раздела rdd2, и третье перемешивание не потребуется.

Например, если someRdd имеет четыре раздела, someOtherRdd имеет два раздела, а обе операции reduceByKey используют три раздела, набор задач будет следующим:

4

Что если rdd1 и rdd2 используют разные разделители или используют разделитель по умолчанию (по хешу) с разными количествами разделов? В этом случае только один RDD (тот, у которого наименьше количество разделов) должен быть заново перемешан для соединения.

Те же преобразования, те же входные данные, разное количество разделов:

5

В качестве одного из способов избежать перемешивания при соединении двух наборов данных можно воспользоваться преимуществами «транслируемых» переменных (broadcast variable). Если один из наборов данных достаточно мал, чтобы поместиться в памяти одного исполнителя, он может быть загружен в хеш-таблицу драйвера и затем «транслироваться» каждому исполнителю. После этого преобразование map может обращаться к хеш-таблице для выполнения поиска.

В каком случае будет лучше, если будет больше перемешиваний

Существует редкое исключение из правила минимизации количества перемешиваний. Дополнительное перемешивание может повысить производительность в том случае, если оно повышает уровень параллелизма. Например, если вы получаете данные в виде нескольких неделимых файлов, разделение, диктуемое InputFormat, может поместить большое количество записей в каждый раздел, в то же время не создав достаточное количество разделов, чтобы использовать все доступные ядра. В этом случае, осуществление нового разделения с большим количеством разделов (которое инициирует перемешивание) после загрузки данных позволит операциям, поступающим после него, более эффективно использовать процессоры кластера.

Другой пример данного исключения может возникнуть при использовании действий свертки (reduce) или агрегации (aggregate) для того, чтобы агрегировать данные в драйвер. При агрегации большого количества разделов, вычисления быстро могут стать узким местом для одного потока драйвера, объединяющего все результаты вместе. Чтобы уменьшить нагрузку на драйвер, сначала можно использовать reduceByKey или aggregateByKey, чтобы выполнить стадию распределенной агрегации, которая разделяет набор данных на меньшее количество разделов. Значения в каждом разделе параллельно объединяются друг с другом перед отправкой результатов драйверу для финальной стадии агрегации. Рассмотрите treeReduce и treeAggregate в качестве примеров того, как это можно реализовать. (Обратите внимание, в самой последней версии на момент написания (1.2) эти функции помечены, как API для разработчиков, но в рамках решения проблемы SPARK-5430 предлагается добавить их стабильные версии в ядро.)

Этот прием особенно полезен, когда агрегация уже сгруппирована по ключу. Для примера рассмотрим приложение, которое должно посчитать количество вхождений каждого слова в корпусе и передать результаты в драйвер в виде отображения. В качестве подхода, реализуемого с помощью агрегации, можно вычислить локальное отображение для каждого раздела, а затем объединить отображения в драйвере. В качестве альтернативного подхода, реализуемого с помощью aggregateByKey, можно выполнить подсчет полностью распределенным способом, а затем просто передать результаты драйверу с помощью collectAsMap.

Вторичная сортировка

Еще одной полезной возможностью является преобразование repartitionAndSortWithinPartitions. Данное преобразование передает сортировку механизму перемешивания, где большие объемы данных могут быть эффективно обработаны, и сортировка может быть объединена с другими операциями.

Например, Apache Hive использует это преобразование внутри своей реализации преобразования join. Оно также играет роль ключевого строительного блока в шаблоне вторичной сортировки (secondary sort), в рамках которого вы стремитесь сгруппировать записи по ключу, а затем, в процессе итераций по значениям, относящимся к ключу, вывести их в определенном порядке. Эта задача возникает в алгоритмах, которые группируют события по пользователю, а потом анализируют события для каждого пользователя, на основе последовательности возникновения этих событий во времени. Использование repartitionAndSortWithinPartitions для выполнения вторичной сортировки на текущий момент представляет некоторые сложности для пользователя, но в рамках решения проблемы SPARK-3655 предполагается существенное упрощение.

Заключение

Теперь вы должны хорошо понимать базовые подходы, позволяющие создавать высокопроизводительные программы Spark! Во второй части мы рассмотрим оптимизацию запросов ресурсов, параллелизма и структур данных.

По материалам: Cloudera

Добавить комментарий

Ваш e-mail не будет опубликован.

закрыть

Поделиться

Отправить на почту
закрыть

Вход

закрыть

Регистрация

+ =