3 метода распараллеливания в Spark

Масштабирование задач науки о данных для скорости

Spark отлично подходит для масштабирования задач и рабочих нагрузок в области анализа данных! Пока вы используете фреймы данных и библиотеки Spark, которые работают с этими структурами данных, вы можете масштабироваться до массивных наборов данных, которые распределяются по кластеру. Однако есть некоторые сценарии, в которых библиотеки могут быть недоступны для работы с фреймами данных Spark, и для достижения распараллеливания с помощью Spark требуются другие подходы. В этом посте обсуждаются три различных способа достижения распараллеливания в PySpark:

  1. Собственный Spark: если вы используете фреймы данных и библиотеки Spark (например, MLlib), то ваш код будет распараллеливаться и распространяться автоматически с помощью Spark.
  2. Пулы потоков: многопроцессорная библиотека может использоваться для запуска параллельных потоков Python и даже выполнения операций с фреймами данных Spark.
  3. UDF Pandas: новая функция Spark, которая позволяет распараллеливать обработку фреймов данных Pandas в среде Spark.

Я приведу примеры каждого из этих различных подходов к достижению параллелизма в PySpark, используя набор данных Бостонский корпус в качестве образца набора данных.

Прежде чем приступить к работе, важно провести различие между параллелизмом и распределением в Spark. Когда задача распараллеливается в Spark, это означает, что параллельные задачи могут выполняться на узле драйвера или рабочих узлах. То, как задача распределяется между этими различными узлами в кластере, зависит от типов структур данных и библиотек, которые вы используете. В Spark возможен параллелизм без распределения, что означает, что узел драйвера может выполнять всю работу. Это ситуация, которая происходит с примером scikit-learn с пулами потоков, который я обсуждаю ниже, и ее следует избегать, если это возможно. Когда задача распределяется в Spark, это означает, что обрабатываемые данные разделяются между разными узлами в кластере и что задачи выполняются одновременно. В идеале вы хотите создавать задачи, которые одновременно распараллеливаются и распределены.

Полный блокнот для примеров, представленных в этом руководстве, доступен на GitHub, а визуализация блокнота - здесь. Я использовал версию сообщества Databricks для создания этой записной книжки и ранее писал об использовании этой среды в моем вводном посте PySpark.

Одинарная резьба

Прежде чем продемонстрировать параллельную обработку в Spark, давайте начнем с примера с одним узлом в базовом Python. Я использовал набор данных по жилью в Бостоне, чтобы построить регрессионную модель для прогнозирования цен на жилье с использованием 13 различных функций. В приведенном ниже коде показано, как загрузить набор данных и преобразовать набор данных во фрейм данных Pandas.

Затем мы разделяем набор данных на группы обучения и тестирования и отделяем функции от меток для каждой группы. Затем мы используем класс LinearRegression, чтобы соответствовать набору обучающих данных и создавать прогнозы для набора тестовых данных. В последней части фрагмента ниже показано, как рассчитать коэффициент корреляции между фактической и прогнозируемой ценами на жилье.

Теперь у нас есть задача, которую мы хотим распараллелить. В этом руководстве цель распараллеливания задачи - одновременно опробовать разные гиперпараметры, но это лишь один из примеров типов задач, которые можно распараллелить с помощью Spark.

Родная искра

Если вы используете фреймы данных и библиотеки Spark, то Spark автоматически распараллелит и распределит вашу задачу. Сначала нам нужно преобразовать фрейм данных Pandas в фрейм данных Spark, а затем преобразовать функции в разреженное векторное представление, необходимое для MLlib. В приведенном ниже фрагменте показано, как выполнить эту задачу для набора данных о жилье.

В общем, лучше избегать загрузки данных в представление Pandas перед их преобразованием в Spark. Вместо этого используйте такие интерфейсы, как spark.read, для прямой загрузки источников данных во фреймы данных Spark.

Теперь, когда у нас есть данные, подготовленные в формате Spark, мы можем использовать MLlib для параллельной подгонки и прогнозирования модели. В приведенном ниже фрагменте показано, как создать и обучить модель линейной регрессии и рассчитать коэффициент корреляции для расчетных цен на жилье.

При работе с фреймами данных Spark в среде Databricks вы увидите список задач, показанный под ячейкой. Эти выходные данные показывают, что задача распределяется по разным рабочим узлам в кластере. В однопоточном примере весь код выполняется на узле драйвера.

Теперь у нас есть задача подбора модели и прогнозирования, которая распараллеливается. Однако что, если мы также хотим одновременно опробовать разные конфигурации гиперпараметров? Вы можете сделать это вручную, как показано в следующих двух разделах, или использовать класс CrossValidator, который выполняет эту операцию изначально в Spark. В приведенном ниже коде показано, как опробовать различные параметры эластичной сети с помощью перекрестной проверки для выбора наиболее эффективной модели.

Если в MLlib есть библиотеки, необходимые для построения прогнозных моделей, то обычно распараллелить задачу несложно. Однако вы можете использовать алгоритмы, которые не включены в MLlib, или использовать другие библиотеки Python, которые не работают напрямую с фреймами данных Spark. Именно здесь становятся полезными пулы потоков и пользовательские функции Pandas.

Пулы потоков

Одним из способов добиться параллелизма в Spark без использования фреймов данных Spark является использование библиотеки многопроцессорной обработки. Библиотека предоставляет абстракцию потока, которую вы можете использовать для создания параллельных потоков выполнения. Однако по умолчанию весь ваш код будет работать на узле драйвера. В приведенном ниже фрагменте показано, как создать набор потоков, которые будут выполняться параллельно и возвращать результаты для разных гиперпараметров для случайного леса.

Этот подход работает с использованием функции карты в пуле потоков. Функция карты принимает лямбда-выражение и массив значений в качестве входных данных и вызывает лямбда-выражение для каждого из значений в массиве. После завершения всех потоков в выходных данных отображается значение гиперпараметра (n_estimators) и результат R-квадрат для каждого потока.

[[10, 0.92121913976894299],  
 [20, 0.92413752558900675],  
 [50, 0.92705124846648523]]

Использование пулов потоков таким образом опасно, потому что все потоки будут выполняться на узле драйвера. Если возможно, лучше всего использовать кадры данных Spark при работе с пулами потоков, потому что тогда операции будут распределены между рабочими узлами в кластере. Версия MLib с использованием пулов потоков показана в приведенном ниже примере, который распределяет задачи по рабочим узлам.

Пользовательские файлы Pandas

Одной из новых функций Spark, которая обеспечивает параллельную обработку, являются пользовательские функции Pandas. С помощью этой функции вы можете разделить фрейм данных Spark на меньшие наборы данных, которые распределяются и преобразуются в объекты Pandas, где применяется ваша функция, а затем результаты объединяются обратно в один большой фрейм данных Spark. По сути, пользовательские функции Pandas позволяют специалистам по обработке данных работать с базовыми библиотеками Python, получая при этом преимущества распараллеливания и распространения. Я привел пример этой функциональности в моем вводном посте PySpark, и я представлю, как Zynga использует функциональность на Spark Summit 2019.

В приведенном ниже коде показано, как выполнить параллельную (и распределенную) настройку гиперпараметров при использовании scikit-learn. Первая часть этого сценария берет набор данных Boston и выполняет перекрестное соединение, которое создает несколько копий входного набора данных, а также добавляет значение дерева (n_estimators) к каждой группе. Затем мы определяем UDF Pandas, который принимает раздел в качестве входных данных (одну из этих копий), и в результате превращает фрейм данных Pandas, указывающий значение гиперпараметра, которое было протестировано, и результат (r-квадрат ). Последним шагом является вызов groupby и apply, который выполняет параллельные вычисления.

При таком подходе результат аналогичен методу с пулами потоков, но основное отличие состоит в том, что задача распределяется по рабочим узлам, а не выполняется только на драйвере. Пример вывода ниже:

[Row(trees=20, r_squared=0.8633562691646341), 
 Row(trees=50, r_squared=0.866335129308371), 
 Row(trees=11, r_squared=0.8257884742588874)]

Заключение

Есть несколько способов добиться параллелизма при использовании PySpark для анализа данных. По возможности лучше использовать собственные библиотеки, но в зависимости от ваших сценариев использования библиотеки Spark могут быть недоступны. В этой ситуации можно использовать пулы потоков или пользовательские функции Pandas для распараллеливания кода Python в среде Spark. Просто будьте осторожны с тем, как вы распараллеливаете свои задачи, и постарайтесь также распределить рабочие нагрузки, если это возможно.

Бен Вебер - ведущий специалист по анализу данных в Zynga. Мы нанимаем"!