Оптимизация заданий Apache Spark. Часть 2

Из этой статьи вы узнаете, как распределение ресурсов, параллелизм и представление данных влияют на производительность заданий Spark.

Продолжим обсуждение, начатое в статье «Оптимизация заданий Apache Spark. Часть 1». Мы постараемся охватить как можно больше материала, который поможет вам создавать быстрые программы Spark. В частности, вы узнаете, как оптимизировать распределение ресурсов, или, другими словами, как сконфигурировать Spark таким образом, чтобы использовать все преимущества кластера.

Затем мы займемся оптимизацией параллелизма, являющегося самым сложным и самым важным фактором, определяющим производительность заданий. Наконец, мы рассмотрим представление данных: формат для размещения на диске (on-disk), используемый при чтении данных (рекомендую использовать Apache Avro или Apache Parquet), а также формат для размещения в оперативной памяти (in-memory), используемый при кэшировании или перемещении данных в системе.

Оптимизация распределения ресурсов

Типичный вопрос, с которым пользователи Spark обращаются за поддержкой, выглядит примерно следующим образом: «У меня есть кластер из 500 узлов, но когда я запускаю свое приложение, одновременно выполняются только две задачи. ПОМОГИТЕ». Учитывая количество параметров, управляющих использованием ресурсов Spark, подобные вопросы не являются тривиальными.

В данном разделе вы узнаете, как выжать все до последней капли из своего кластера. Рекомендации и конфигурации немного отличаются в зависимости от используемого менеджера кластера (YARN, Mesos или Spark Standalone), но мы сфокусируемся только на YARN, который Cloudera рекомендует всем пользователям.

Два основных целевых ресурса для Spark (и YARN) – это процессоры и оперативная память. Дисковый и сетевой ввод/вывод, безусловно, также влияют на производительность, но ни Spark, ни YARN на текущий момент не участвуют в активном управлении этими ресурсами.

Каждому исполнителю (executor) в приложении Spark выделяется одинаковое фиксированное количество ядер и одинаковый фиксированный размер кучи (heap). Количество ядер можно задать с помощью параметра —executor-cores при запуске spark-submit, spark-shell или pyspark из командной строки, или задав свойство spark.executor.cores в файле spark-defaults.conf или с помощью объекта SparkConf.

Размер кучи можно задать аналогичным образом с помощью параметра —executor-memory или свойства spark.executor.memory. Свойство cores определяет количество задач (task), которые может одновременно выполнять исполнитель. Например, —executor-cores 5 означает, что каждый исполнитель может одновременно выполнять максимум пять задач. Свойство memory влияет на объем кэшируемых данных, а также на максимальные размеры структур для перемешивания (shuffle) данных, используемых при группировании, агрегации и соединении.

Параметр командной строки —num-executors или свойство конфигурации spark.executor.instances задают необходимое количество исполнителей. Начиная с CDH 5.4/Spark 1.3, пользователь сможет не задавать это свойство, если активирует динамическое выделение ресурсов с помощью свойства spark.dynamicAllocation.enabled. Динамическое выделение ресурсов позволяет приложениям Spark запрашивать исполнители, когда накапливаются задачи, ожидающие выполнения, и освобождать исполнители в периоды бездействия.

Также важно позаботиться о том, чтобы ресурсы, запрашиваемые Spark, соответствовали ресурсам, имеющимся в наличии у YARN. Свойства YARN, относящиеся к данному вопросу:

  • nodemanager.resource.memory-mb задает максимальный суммарный объем памяти, используемой контейнерами на каждом узле;
  • nodemanager.resource.cpu-vcores задает максимальное суммарное количество ядер, используемых контейнерами на каждом узле.

В результате запроса пяти ядер для исполнителей, у YARN будет запрошено пять виртуальных ядер. Память, запрашиваемая у YARN, определяется немного сложнее по нескольким причинам:

  • Параметр executormemory и свойство spark.executor.memory задают размер кучи исполнителя, но виртуальные машины Java (JVM) могут использовать некоторый объем памяти вне кучи, например, для интернированных строк (interned string) и прямых байтовых буферов (direct byte buffer). Значение свойства yarn.executor.memoryOverhead прибавляется к объему памяти исполнителя, чтобы определить полный объем памяти, запрашиваемой у YARN для каждого исполнителя. По умолчанию значение этого свойства равно max(384, 0,07 * spark.executor.memory).
  • YARN может незначительно округлять объем запрашиваемой памяти в большую сторону. Свойства yarn.scheduler.minimumallocationmb и yarn.scheduler.incrementallocationmb задают минимальный объем и величину приращения запрашиваемой памяти соответственно.

На следующем рисунке (масштаб не соответствует значениям по умолчанию) представлена иерархия свойств Spark и YARN, управляющих памятью:

spark-tuning2-f1

Рассмотрим еще несколько дополнительных моментов, касающихся исполнителей Spark:

  • Мастер приложения (application master), являющийся невыполняющим контейнером (non-executor container) и обладающий специальной возможностью запрашивать контейнеры у YARN, в процессе своей работы также потребляет ресурсы, которые необходимо учитывать. В режиме yarn-client по умолчанию мастеру приложения выделяется 1 024 МБ памяти и одно виртуальное ядро. В режиме yarn-cluster драйвер (driver process) выполняется внутри мастера приложения, поэтому обычно полезно задать ресурсы драйвера с помощью параметров —driver-memory и —driver-cores.
  • Выделение исполнителям слишком большого объема памяти обычно приводит к чрезмерным задержкам в процессе сборки мусора (garbage collection). Примерно 64 ГБ – оптимальный верхний предел для одного исполнителя.
  • Как мы уже отмечали, у HDFS-клиента возникают проблемы при большом количестве одновременно выполняющихся потоков. Как максимум пять задач на исполнитель позволяют использовать всю пропускную способность записи, поэтому необходимо, чтобы количество ядер на исполнитель было меньше этого значения.
  • Применение «маленьких» исполнителей (например, с одним ядром и объемом памяти, достаточным для запуска только одной задачи) лишает преимуществ от выполнения нескольких задач на одной JVM. Например, «транслируемые» переменные (broadcast variable) должны быть скопированы один раз для каждого исполнителя, поэтому большое количество «маленьких» исполнителей приведет к значительному возрастанию количества копий данный.

Чтобы конкретизировать изложенный выше материал, рассмотрим практический пример конфигурирования приложения Spark для максимального использования возможностей кластера. Рассмотрим кластер, состоящий из шести узлов, на которых работают менеджеры узлов (NodeManager). Каждый узел имеет 16 ядер и 64 ГБ оперативной памяти. Свойствам менеджеров узлов yarn.nodemanager.resource.memory-mb и yarn.nodemanager.resource.cpu-vcores, вероятно, необходимо задать следующие значения: 63 * 1 024 = 64 512 (МБ) и 15 соответственно. Мы не выделяем 100% ресурсов для контейнеров YARN, потому что на каждом узле необходимо зарезервировать некоторые ресурсы для работы операционной системы и демонов Hadoop. В данном случае мы резервируем 1 ГБ и 1 ядро для этих системных процессов. Cloudera Manager помогает пользователям, осуществляя учет и автоматически конфигурируя эти свойства YARN.

Вероятно, первым делом мы захотим использовать следующую конфигурацию: numexecutors 6 —executorcores 15 —executormemory 63G. Тем не менее это неправильный подход по следующим причинам:

  • Величина 63 ГБ + накладные расходы (memory overhead) исполнителей превышает емкость менеджера узла, которая составляет 63 ГБ.
  • Мастер приложения будет использовать одно ядро на одном из узлов, а это означает, что на этом узле будет недостаточно ядер для исполнителя с 15 ядрами.
  • Значение 15 ядер на исполнитель может привести к низкой пропускной способности ввода/выводы HDFS.

Более рациональной будет следующая конфигурация: —num-executors 17 —executor-cores 5 —executor-memory 19G. Почему?

  • В результате этой конфигурации на каждом узле будет по три исполнителя, кроме того узла, на котором выполняется мастер приложения, где будет два исполнителя.
  • Значение параметра —executor-memory было получено следующим образом: 63 / 3 исполнителя на узел = 21; 21 * 0,07 = 1,47; 21 – 1,47 ~ 19.

Оптимизация параллелизма

Как вы знаете, Spark – это система параллельной обработки данных. Возможно, менее очевидно то, что Spark не является «волшебной» системой и ограничен в своих возможностях определять оптимальные параметры параллелизма. Каждый этап (stage) включает в себя некоторое количество задач (task), последовательно обрабатывающих данные. При оптимизации заданий Spark, количество задач является, вероятно, самым важным параметром, влияющим на производительность.

Как определяется количество задач? Формирование этапов и группирование RDDs (resilient distributed dataset, эластичный распределенный набор данных) рассмотрены в предыдущей статье. (Кратко напомним: такие преобразования, как repartition и reduceByKey, формируют границы этапов.) Количество задач этапа равно количеству разделов (partition) последнего RDD этапа. Количество разделов RDD равно количеству разделов RDD, от которого он зависит, с несколькими исключениями: преобразование coalesce позволяет создать RDD с меньшим количеством разделов, чем в родительском RDD; преобразование union создает RDD, количество разделов в котором равно сумме количеств разделов родителей; преобразование cartesian создает RDD, количество разделов в котором равно произведению количеств разделов родителей.

А что насчет RDDs без родителей? Количество разделов RDDs, полученных из textFile или hadoopFile, определяется базовой спецификацией InputFormat системы MapReduce. Обычно создается раздел для каждого прочитанного блока HDFS. Количество разделов RDDs, созданных с помощью метода parallelize, определяется параметром метода, переданным пользователем, или свойством spark.default.parallelism, если параметр метода не задан.

Чтобы узнать количество разделов RDD, можно использовать rdd.partitions().size().

Основные опасения вызывает слишком малое количество задач. Если количество задач меньше, чем количество доступных для их выполнения слотов (slot), этап не сможет получить преимущества от использования всех доступных процессоров.

Малое количество задач также означает, что любые операции агрегации, происходящие в каждой задаче, будут более ограничены в отношении памяти. Любая из операций join, cogroup или *ByKey предполагает хранение объектов для группирования или сортировки в хэш-картах (hashmap) или в буферах в оперативной памяти. Операции join, cogroup и groupByKey используют эти структуры данных в задачах этапов, выполняющихся на стороне выборки (fetching side) инициируемых ими перемешиваний. Операции reduceByKey и aggregateByKey используют эти структуры данных в задачах этапов, выполняющихся по обе стороны инициируемых ими перемешиваний.

Если записи (record), предназначенные для этих операций агрегации с трудом помещаются в памяти, может возникнуть некоторый беспорядок. Во-первых, размещение большого количества записей в этих структурах данных создает нагрузку на процесс сборки мусора, что может привести к задержкам в работе. Во-вторых, когда записи не помещаются в памяти, Spark будет выгружать их на диск, что приведет к выполнению операций дискового ввода/вывода и сортировки. Эти дополнительные расходы ресурсов во время больших перемешиваний, вероятно, являются причиной №1, вызывающей остановку заданий, из тех, что я наблюдал у клиентов Cloudera.

Каким же образом увеличить количество разделов? Если данный этап выполняет чтение из Hadoop, вам доступны следующие варианты:

  • используйте преобразование repartition, инициирующее перемешивание;
  • сконфигурируйте InputFormat так, чтобы создать больше сплитов (split);
  • запишите входные данные в HDFS, используя меньший размер блока.

Если этап получает входные данные от другого этапа, преобразование, сформировавшее границу этапа, принимает параметр numPartitions, как показано ниже:

Безымянный

Каким должно быть значение X? Самый простой способ оптимизировать количество разделов – это эксперимент. Возьмите количество разделов родительского RDD и умножайте его на 1,5, пока производительность не перестанет улучшаться.

Существует более принципиальный способ расчета X, но его трудно применить априори, поскольку некоторые величины трудно вычислить. Я привожу здесь этот расчет не потому, что он рекомендуется для ежедневного использования, а потому что он помогает понять логику. Основной целью является достаточное количество задач, чтобы данные, предназначенные для каждой задачи, помещались в памяти этой задачи.

Объем памяти, доступный для каждой задачи, вычисляется по формуле: (spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction) / spark.executor.cores. Свойства spark.shuffle.memoryFraction и spark.shuffle.safetyFraction по умолчанию равны 0,2 и 0,8 соответственно.

Общий объем памяти, занимаемый перемешиваемыми данными, определить сложнее. В качестве ближайшей эвристики можно найти отношение показателей Shuffle Spill (Memory) и Shuffle Spill (Disk) для данного этапа. Затем необходимо умножить общий объем записываемых при перемешивании данных на полученное число. Однако это может несколько усложниться, если этап выполняет свертку (reduction):

spark-tuning2-f2

Затем немного округлите в большую сторону, потому что обычно лучше иметь слишком много разделов, чем слишком мало.

Фактически, когда есть сомнения, почти всегда лучше ошибиться в сторону большего количества задач (и, соответственно, разделов). Этот совет противоречит рекомендациям для MapReduce, которые советуют быть более сдержанным в отношении количества задач. Разница заключается в том, что в случае MapReduce имеют место большие накладные расходы при старте задач, а в случае Spark такой эффект не наблюдается.

Уменьшение структур данных

Данные обрабатываются в Spark в форме записей. Запись имеет два представления: десериализованный Java-объект (deserialized Java object) и сериализованный двоичный объект (serialized binary object). В общем случае Spark использует десериализованное представление для записей, находящихся в оперативной памяти, и сериализованное представление для записей, находящихся на диске или передаваемых по сети. В настоящее время ведется работа по улучшению Spark, в рамках которой планируется в будущем представлять часть перемешиваемых данных, размещаемых в оперативной памяти, в сериализованной форме.

Свойство spark.serializer задает сериализатор (serializer), используемый для конвертирования этих представлений в обоих направлениях. Предпочтительным сериализатором является Kryo (org.apache.spark.serializer.KryoSerializer). К сожалению, Kryo не является сериализатором по умолчанию из-за некоторой его нестабильности в ранних версиях Spark и желания не нарушать совместимость, но он должен использоваться всегда.

Объем записей в двух этих представлениях имеет существенное влияние на производительность Spark. Имеет смысл изучить типы данных, обрабатываемые в системе, и поискать места, где можно убрать немного «жира».

Раздутые десериализованные объекты приведут к тому, что Spark будет чаще выгружать данные на диск, и уменьшится количество десериализованных записей, которые Spark может кэшировать (например, на уровне хранения (storage level) MEMORY). Руководство по настройке Spark содержит отличный раздел, где рассматриваются способы уменьшения этих объектов.

Раздутые сериализованные объекты приведут к тому, что возрастет количество операций дискового и сетевого ввода/вывода, и уменьшится количество сериализованных записей, которые Spark может кэшировать (например, на уровне хранения MEMORY_SER). В данном случае можно предпринять следующее: необходимо убедиться в том, что все определенные вами пользовательские классы зарегистрированы с помощью API SparkConf#registerKryoClasses.

Форматы данных

Всегда, когда вы можете выбирать способ хранения данных на диске, используйте расширяемый двоичный формат, например, Avro, Parquet, Thrift или Protobuf. Выберите один из этих форматов и придерживайтесь его. Если уточнить, то когда говорят об использовании Avro, Thrift или Protobuf в Hadoop, имеют в виду, что каждая запись представляет собой структуру Avro/Thrift/Protobuf, хранящуюся в файле последовательности (sequence file). JSON просто того не стоит.

Каждый раз, когда вы собираетесь хранить большой объем данных в формате JSON, думайте о конфликтах, которые будут начаты на Ближнем Востоке, о прекрасных реках Канады, которые будут перегорожены плотинами, или о радиоактивных осадках, спровоцированных атомными станциями, которые будут построены в Америке, чтобы генерировать энергию для питания процессоров, снова и снова выполняющих парсинг ваших файлов. Кроме того, совершенствуйте свои навыки общения с людьми, чтобы убедить коллег и начальство рассуждать таким же образом.

По материалам: Cloudera

Перевод Станислава Петренко

Добавить комментарий

Ваш e-mail не будет опубликован.

закрыть

Поделиться

Отправить на почту
закрыть

Вход

закрыть

Регистрация

+ =