Возможные причины разницы в производительности между двумя очень похожими фреймами данных Spark

Я работаю над улучшением производительности некоторых операций Spark для механизма рекомендаций, прототип которого я создаю. Я обнаружил существенные различия в производительности между DataFrames, которые я использую. Ниже приведены результаты описания () для обоих.

df1 (быстро, numPartitions = 4):

+-------+------------------+--------------------+
|summary|           item_id|          popularity|
+-------+------------------+--------------------+
|  count|            187824|              187824|
|   mean| 96693.34836868558|                 1.0|
| stddev|55558.023793621316|5.281958866780519...|
|    min|                 0|  0.9999999999999998|
|    max|            192806|                 1.0|
+-------+------------------+--------------------+

df2 (примерно в 10 раз медленнее, numPartitions = ± 170):

+-------+-----------------+-----------------+
|summary|          item_id|            count|
+-------+-----------------+-----------------+
|  count|           187824|           187824|
|   mean|96693.34836868558|28.70869537439305|
| stddev|55558.02379362146|21.21976457710462|
|    min|                0|                1|
|    max|           192806|              482|
+-------+-----------------+-----------------+

Оба DataFrames кэшируются, имеют одинаковый размер с точки зрения строк (187824) и столбцов (2) и имеют идентичный item_id столбец. Основное отличие состоит в том, что кадр 1 содержит во втором столбце число с плавающей запятой, а кадр 2 содержит целое число.

Кажется, что каждая операция для DataFrame 2 выполняется намного медленнее, от простой операции .describe().show() до более сложной .subtract().subtract().take(). В последнем случае DataFrame 2 занимает 18 секунд по сравнению с 2 секундами для кадра 1 (почти в 10 раз медленнее!).

Не знаю, где начать искать объяснение причины этой разницы. Приветствуются любые подсказки или подталкивания в правильном направлении.

ОБНОВЛЕНИЕ: по предложению Вячеслава Родионова количество разделов фреймов данных, по-видимому, является причиной проблем с производительностью с df2.

Если копнуть глубже, оба фрейма данных являются результатом .groupBy().agg().sortBy() операций с одним и тем же исходным фреймом данных. Операция .groupBy().agg() возвращает 200 разделов, а затем .sortBy() возвращает соответственно 4 и ± 170 разделов, почему это могло быть?


person Fulco    schedule 13.10.2016    source источник
comment
Я начну с рассмотрения df.rdd.getNumPartitions ()   -  person Viacheslav Rodionov    schedule 13.10.2016
comment
Количество разделов - 174 (медленных) против 4 (быстрых). Спасибо за этот совет, я помню, что читал что-то об этом, я буду копать глубже, чтобы разобраться в ситуации. Количество разделов было выбрано Spark автоматически. Являются ли метод проб, ошибок и корректировка вручную единственным способом сделать это?   -  person Fulco    schedule 13.10.2016


Ответы (1)


Я начну с рассмотрения df.rdd.getNumPartitions()

Меньшее количество разделов большего размера почти всегда является хорошей идеей, так как это позволяет лучше сжимать данные и выполнять более актуальную работу, а не манипулировать файлами.

Еще нужно посмотреть, как выглядят ваши данные. Уместно ли это для задачи, которую вы пытаетесь выполнить?

  • Если это поле упорядочено по дате, которое вы используете для применения BETWEEN операции, это будет быстрее, чем просто работа с несортированными данными.
  • Если вы работаете с конкретными месяцами или годами, имеет смысл разделить данные по ним.
  • То же самое и с идентификаторами. Если вы работаете с определенными идентификаторами, поместите одинаковые идентификаторы «ближе» друг к другу, разбив / отсортировав набор данных.

Мое эмпирическое правило при хранении данных - сначала разделите на несколько полей с низкой мощностью (в основном логические и даты), затем отсортируйте все другие поля с sortWithinPartitions в порядке важности данных. Таким образом вы добьетесь наилучшей степени сжатия (что означает более быстрое время обработки) и лучшей локальности данных (опять же, более быстрое время обработки). Но, как всегда, все зависит от вашего варианта использования, всегда думайте, как вы работаете со своими данными, и готовьте их соответствующим образом.

person Viacheslav Rodionov    schedule 13.10.2016
comment
Спасибо за Ваш ответ. Фактически оба фрейма данных отсортированы по вторым столбцам (количество и популярность соответственно). Кроме того, оба фрейма являются результатом .groupBy().agg().sortBy() операций из одного и того же исходного фрейма данных. Похоже, что шаг groupBy.agg() дает 200 разделов в обоих случаях, тогда как sortBy дает 4 и ~ 170 соответственно. Теперь пытаюсь понять, почему это происходит - person Fulco; 13.10.2016
comment
Я могу подтвердить, что повторное разбиение df2 на 4 дает значительное улучшение производительности. - person Fulco; 13.10.2016
comment
@Fulco groupBy сначала сгруппирует ваши данные локально. Проверьте getNumPartitions до и после groupBy().agg(). Это то же самое? Затем при сортировке вы сначала выполните предварительную сортировку локально, а затем перенесете все данные в одно место. Если ваши данные, скажем, разделены по полю, которое вы сортируете, а также предварительно отсортированы внутри раздела, то вам не нужно много передавать, у вас уже есть правильный порядок, верно? - person Viacheslav Rodionov; 13.10.2016
comment
@Fulco, если мой ответ решил вашу проблему, не могли бы вы отметить его как лучший ответ? - person Viacheslav Rodionov; 13.10.2016
comment
Эволюция количества разделов выглядит следующим образом: original = 1, после groupBy().agg() = 200, после sortBy() = 4 или ± 170. Шаг sortBy() - это та часть, где количество разделов различается, но я до сих пор не понимаю, почему это происходит. С тех пор я настроил spark.sql.shuffle.partitions на 4 на моем локальном компьютере, в отличие от значения по умолчанию 200, что улучшило общую производительность Spark на моем компьютере. Мне нужно выяснить, как разумно управлять этим параметром, когда я перехожу в кластер. - person Fulco; 14.10.2016