3 метода за паралелизиране в Spark

Мащабиране на задачи за наука за данни за скорост

Spark е страхотен за увеличаване на задачите и работните натоварвания в областта на науката за данни! Докато използвате рамки с данни и библиотеки на Spark, които работят върху тези структури от данни, можете да мащабирате до масивни набори от данни, които се разпространяват в клъстер. Има обаче някои сценарии, при които библиотеките може да не са налични за работа с рамки от данни на Spark и са необходими други подходи за постигане на паралелизиране със Spark. Тази публикация обсъжда три различни начина за постигане на паралелизация в PySpark:

  1. Native Spark: ако използвате рамки с данни и библиотеки на Spark (напр. MLlib), тогава вашият код ще бъде паралелизиран и разпространен нативно от Spark.
  2. Пулове на нишки: Многопроцесорната библиотека може да се използва за изпълнение на едновременни нишки на Python и дори за извършване на операции с кадри с данни на Spark.
  3. UDF на Pandas: Нова функция в Spark, която позволява паралелна обработка на рамки с данни на Pandas в среда на Spark.

Ще дам примери за всеки от тези различни подходи за постигане на паралелизъм в PySpark, като използвам набора от данни „Бостън жилища“ като примерен набор от данни.

Преди да започнете, важно е да направите разлика между паралелизъм и разпространение в Spark. Когато една задача е паралелизирана в Spark, това означава, че едновременни задачи може да се изпълняват на възела на драйвера или работните възли. Как задачата се разделя между тези различни възли в клъстера зависи от типовете структури от данни и библиотеки, които използвате. Възможно е да има паралелизъм без разпределение в Spark, което означава, че драйверният възел може да изпълнява цялата работа. Това е ситуация, която се случва с примера на scikit-learn с пулове от нишки, които обсъждам по-долу, и трябва да се избягва, ако е възможно. Когато една задача е разпределена в Spark, това означава, че данните, с които се работи, са разделени между различни възли в клъстера и че задачите се изпълняват едновременно. В идеалния случай искате да създавате задачи, които са паралелизирани и разпределени.

Пълният бележник за примерите, представени в този урок, е достъпен в GitHub, а изобразяване на бележника е достъпно тук. Използвах изданието на общността на Databricks, за да създам този бележник и преди писах за използването на тази среда в моята „въвеждаща публикация“ на PySpark.

Единична нишка

Преди да покажем паралелната обработка в Spark, нека започнем с пример с един възел в основния Python. Използвах набора от данни за жилищата в Бостън, за да създам регресионен модел за прогнозиране на цените на жилищата, използвайки 13 различни функции. Кодът по-долу показва как да заредите набора от данни и да конвертирате набора от данни в рамка с данни на Pandas.

След това разделяме набора от данни на групи за обучение и тестване и отделяме функциите от етикетите за всяка група. След това използваме класа LinearRegression, за да напаснем набора от данни за обучение и да създадем прогнози за набора от тестови данни. Последната част от фрагмента по-долу показва как да се изчисли коефициентът на корелация между действителните и прогнозираните цени на жилищата.

Сега имаме задача, която бихме искали да паралелизираме. За този урок целта на паралелизиране на задачата е да се изпробват едновременно различни хиперпараметри, но това е само един пример за типовете задачи, които можете да паралелизирате със Spark.

Родна искра

Ако използвате рамки с данни и библиотеки на Spark, тогава Spark естествено ще паралелизира и разпредели вашата задача. Първо ще трябва да преобразуваме рамката с данни на Pandas в рамка с данни на Spark и след това да трансформираме функциите в разреденото векторно представяне, необходимо за MLlib. Фрагментът по-долу показва как да изпълните тази задача за набора от данни за жилища.

По принцип е най-добре да избягвате зареждането на данни в представяне на Pandas, преди да го конвертирате в Spark. Вместо това използвайте интерфейси като spark.read за директно зареждане на източници на данни в рамки с данни на Spark.

Сега, след като имаме данните, подготвени във формат Spark, можем да използваме MLlib за извършване на паралелно напасване и прогнозиране на модела. Фрагментът по-долу показва как да създадете и обучите линеен регресионен модел и да изчислите коефициента на корелация за прогнозните цени на жилищата.

Когато работите с кадри с данни на Spark в средата Databricks, ще забележите списък със задачи, показан под клетката. Този резултат показва, че задачата се разпределя към различни работни възли в клъстера. В примера с една нишка, целият код се изпълнява на възела на драйвера.

Сега имаме задача за монтиране на модел и прогнозиране, която е паралелизирана. Но какво ще стане, ако искаме едновременно да изпробваме различни конфигурации на хиперпараметри? Можете да направите това ръчно, както е показано в следващите два раздела, или да използвате класа CrossValidator, който изпълнява тази операция първоначално в Spark. Кодът по-долу показва как да изпробвате различни параметри на еластична мрежа, като използвате кръстосано валидиране, за да изберете най-ефективния модел.

Ако MLlib разполага с библиотеките, от които се нуждаете за изграждане на прогнозни модели, тогава обикновено е лесно да паралелизирате задача. Въпреки това може да искате да използвате алгоритми, които не са включени в MLlib, или да използвате други библиотеки на Python, които не работят директно с рамки с данни на Spark. Това е мястото, където пуловете от нишки и Pandas UDF стават полезни.

Пулове на нишки

Един от начините, по които можете да постигнете паралелизъм в Spark, без да използвате рамки с данни на Spark, е чрез използване на многопроцесорна библиотека. Библиотеката предоставя абстракция на нишка, която можете да използвате, за да създадете паралелни нишки на изпълнение. По подразбиране обаче целият ви код ще се изпълнява на възела на драйвера. Фрагментът по-долу показва как да създадете набор от нишки, които ще работят паралелно, са връщащи резултати за различни хиперпараметри за произволна гора.

Този подход работи чрез използване на функцията map върху група от нишки. Функцията map приема ламбда израз и масив от стойности като вход и извиква ламбда израза за всяка от стойностите в масива. След като всички нишки завършат, изходът показва стойността на хиперпараметъра (n_estimators) и резултата R-квадрат за всяка нишка.

[[10, 0.92121913976894299],  
 [20, 0.92413752558900675],  
 [50, 0.92705124846648523]]

Използването на пулове от нишки по този начин е опасно, защото всички нишки ще се изпълняват на възела на драйвера. Ако е възможно, най-добре е да използвате рамки с данни на Spark, когато работите с пулове от нишки, защото тогава операциите ще бъдат разпределени между работните възли в клъстера. Версията на MLib за използване на пулове от нишки е показана в примера по-долу, който разпределя задачите към работни възли.

Pandas UDF

Една от по-новите функции в Spark, която позволява паралелна обработка, е Pandas UDF. С тази функция можете да разделите рамка от данни на Spark на по-малки набори от данни, които се разпространяват и преобразуват в обекти на Pandas, където се прилага вашата функция, а след това резултатите се комбинират обратно в една голяма рамка от данни на Spark. По същество UDF на Pandas позволяват на специалистите по данни да работят с базови библиотеки на Python, като същевременно получават предимствата на паралелизиране и разпространение. Дадох пример за тази функционалност в моята „въвеждаща публикация“ на PySpark и ще „представя“ как Zynga използва функционалността на Spark Summit 2019.

Кодът по-долу показва как да извършите паралелна (и разпределена) настройка на хиперпараметър, когато използвате scikit-learn. Първата част на този скрипт взема набора от данни от Бостън и извършва кръстосано свързване, което създава множество копия на набора от входни данни и също така добавя дървовидна стойност (n_estimators) към всяка група. След това дефинираме UDF на Pandas, който приема дял като вход (едно от тези копия) и в резултат превръща кадър с данни на Pandas, указващ стойността на хиперпараметъра, който е тестван, и резултата (r-squared ). Последната стъпка е извикването groupby и apply, което извършва паралелното изчисление.

С този подход резултатът е подобен на метода с пулове от нишки, но основната разлика е, че задачата се разпределя между работните възли, а не се изпълнява само на драйвера. Примерен резултат е по-долу:

[Row(trees=20, r_squared=0.8633562691646341), 
 Row(trees=50, r_squared=0.866335129308371), 
 Row(trees=11, r_squared=0.8257884742588874)]

Заключение

Има множество начини за постигане на паралелизъм при използване на PySpark за наука за данни. Най-добре е да използвате собствени библиотеки, ако е възможно, но въз основа на вашите случаи на употреба може да няма налични библиотеки на Spark. В тази ситуация е възможно да използвате пулове от нишки или Pandas UDF за паралелизиране на вашия Python код в среда на Spark. Просто внимавайте как паралелизирате задачите си и се опитайте също да разпределите натоварванията, ако е възможно.

Бен Вебер е главен учен по данни в Zynga. Наемаме"!