Предварителна обработка на разпределени данни с помощта на Dask, Amazon ECS и Python (част 1)

Качеството и точността на моделите за машинно обучение зависят от много фактори. Един от най-критичните фактори е предварителната обработка на набора от данни, преди да бъде подаден в алгоритъма за машинно обучение, който се учи от данните. Следователно е изключително важно да им предоставите правилните данни за проблема, който искате да разрешите. Например, някои алгоритми не поддържат нулеви стойности и трябва да бъдат трансформирани и обработени преди обучение. Трябва също така да разберете защо има липсващи стойности и дали има специфичен модел за тези стойности и какво е тяхното влияние.

Инженерите по данни и специалистите по данни често използват инструменти от екосистемата на Python като NumpyиPandasза анализиране, трансформиране и визуализиране на своите данни, които са проектирани да бъдат с висока производителност, интуитивни и ефективни библиотеки. Извършването на такива операции върху малък набор от данни по бърз и мащабируем начин не е предизвикателство, стига наборът от данни да може да се побере в паметта на една машина. Въпреки това, ако наборът от данни е твърде голям и не може да се побере в една машина, инженерите по данни може да бъдат принудени да пренапишат своя код в по-мащабируеми инструменти като Spark и SparkML, които могат да бъдат изчислително поддържани от голям EMR клъстер.

За да преодолея този проблем, използвам Dask. Daskсе възползва от познанията на хората с известни библиотеки като Pandas и можете да го използвате за разработване на код за обработка на данни по скалируем, паралелен и разпределен начин.

В тази статия обсъждам как да създам клъстер без сървър за предварителна обработка на данни по разпределен и паралелен начин. Също така използвам клъстер Dask Distributed, който осигурява разширен паралелизъм за анализи и обработка на данни. За да осигуря мащабна производителност, като същевременно оптимизирам разходите и запазя гъвкавостта на решението, ще изградя клъстер без сървър, използвайки AWS Fargate, който не осигурява допълнителни разходи за управление на клъстера и едно API извикване за мащабиране на клъстера нагоре и надолу, след което ще интегрирам клъстера с Amazon Бележник SageMaker Jupyter. (Или всяка друга IDE по предпочитание)

Какво е Dask Distributed?

Dask.distributed: е лека библиотека с отворен код за разпределени изчисления в Python. Освен това е централизирано управляван, разпределен, динамичен планировчик на задачи. Dask има три основни компонента:

процес на планировчик на dask: координира действията на няколко работници. Планировчикът е асинхронен и управляван от събития, като едновременно отговаря на заявки за изчисление от множество клиенти и проследява напредъка на множество работници.

dask-worker процеси:Които са разпръснати в множество машини и едновременните заявки на няколко клиента.

dask-клиентски процес: който е основната входна точка за потребителите на dask.distributed

Диаграма на решението:

Трябва да поддържам комуникация с ниска латентност и прости мрежови конфигурации между преносимия компютър Juypter и клъстера Fargate. Следователно създавам както Notebook, така и Dask Distributed клъстер в един и същ виртуален частен облак (VPC).

Стъпки преди внедряването:

Първо, трябва да създам хранилище на еластичен регистър на контейнери (ECR) като предварително условие. За да направя това, отивам на AWS Console -› Elastic Container Service (ECS) -› Изберете Repositories и след това „Create repository“.

(Можете също да пропуснете тази стъпка и да създадете хранилище в github)

Трябва да му дадем име — „dask“, след което щракнете върху „Следваща стъпка“:

След това преминаваме към страницата „Първи стъпки“, където намираме първоначалните команди за създаване и натискане на изображението на контейнера.

Сега, след като хранилището е готово, ще превключа към шел „Терминал“, за да изтегля, създам и маркирам изображението на контейнера, след което да го насоча към ECR. За да направя това, изпълнявам следните команди на обвивката:

bash# git clone https://github.com/wmlba/ECS-Dask.git
bash# cd ECS-Dask; tar -xzf base-image.tar.gz; cd base-image
bash# `aws ecr get-login --no-include-email --region us-east-1`
bash# docker build -t dask .
bash# docker tag dask:latest <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest
bash# docker push <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest

ЗАБЕЛЕЖКА: За да изпълните горните команди, трябва да имате инсталиран Docker локално на машината, от която изпълнявате командите, и да конфигурирате идентификационните данни на AWS.

Докер изображението, което използвам, е леко модифицирано от публикуваното в dask хранилище

Ще отнеме няколко минути, за да се изгради изображението, след което да се премести в ECR хранилището, което беше създадено в по-ранна стъпка. Можете да проверите това, като щракнете върху името на хранилището в ECS конзолата.

Разположете решението

Сега, когато имам готово изображение, трябва да разположа решението с помощта на шаблон CloudFormation, като следвам стъпките по-долу:

  1. стартирайте този CloudFormation шаблон във вашия акаунт. Завършването на стека CloudFormation отнема приблизително 3–5 минути

Стекът CloudFormation ще създаде ресурси като: Fargate Cluster, дефиниции на задачи, услуги и задачи както за Dask worker, така и за Scheduler. Той също така ще създаде роля и политика за изпълнение на IAM, за да позволи достъп до хранилището на Elastic Container Registry (ECR) и групите регистрационни файлове на CloudWatch за регистрационни файлове. Всички ресурси ще бъдат създадени в региона us-east-1 по подразбиране.

2. На страницата Specify Details посочвам частна подмрежа с NAT шлюз за клъстера. Dask Scheduler и Workers ще комуникират през частна мрежа и NAT шлюзът е необходим само за услугата ECS, за да може да изтегли ECR изображението от хранилището. След това изберете Напред:

3. На страницата Опции изберете Напред.

4. На страницата Преглед прегледайте и потвърдете настройките. Не забравяйте да поставите отметка в квадратчето, което потвърждава, че шаблонът ще създаде ресурси за управление на самоличността и достъпа (IAM) на AWS с персонализирани имена. За да разположите стека, изберете Създаване. След няколко минути създаването на стека трябва да приключи.

След като стекът е създаден, можете също да потвърдите, че клъстерът ECS Dask е внедрен и работещ. Можете да проверите това, като превключите към ECS Console -› Щракнете върху Клъстери -› Щракнете върху Fargate-Dask-Clusterи в раздела със задачи трябва да има 2 изпълнявани задачи:

Сега, когато Dask Cluster е готов, ще създам SageMaker Notebook, за да мога да започна да използвам клъстера. За да направя това, преминавам към конзолата на SageMaker -› Екземпляри на бележник -›Създаване на екземпляр на бележник.

След това ще избера същите VPC и подмрежи, които бяха избрани по-рано в шаблона CloudFormation:

ЗАБЕЛЕЖКА: Можете да изберете всяка друга подмрежа и група за сигурност, стига да активирате достъпа между бележника на SageMaker и Dask Cluster.

След това създавам нов бележник на python3, като щраквам върху Нов -› conda_python3. Пакетите Dask са инсталирани по подразбиране на преносимия компютър SageMaker, но е важно да се уверите, че пакетът е актуализиран до най-новата версия. За да проверя това, ще изпълня командата conda update на бележника:

ЗАБЕЛЕЖКА: Ако версията на клиента е по-ниска от версията на планировчика и работната версия, ще срещнете грешки при стартиране на клиента.

Следващата стъпка ще бъде създаването на клиента и свързването към клъстера на dask чрез изпълнение на кода по-долу:

Забележете, че използвах DNS име на планировчика, който беше автоматично присвоен с помощта на ECS Service Discovery Functionality, който използва действия на API за автоматично наименуване на Route 53, за да управлява DNS записи на Route 53

Сега нека направим някои операции с данните, използвайки клъстера, но преди това ще увелича броя на работниците в клъстера до 7 работници. За да направя това, изпълнявам една команда в бележника, както е показано по-долу:

След няколко секунди състоянието на работните задачи в конзолата Fargate ще бъде „РАБОТЕЩО“. Ще рестартирам Dask Client, за да се уверя, че използваме паралелната природа на клъстера.

Сега имаме клъстер от 14 ядра CPU и 12 GB памет (2 CPU ядра и 2 GB памет за всеки от 7-те работници). Нека извършим някои операции, изискващи интензивно изчисление и памет, и да генерираме някои прозрения. Зареждам рамка от данни на dask с данните и изчислявам разстоянието за пътуване и групиране по броя на пътниците.

Резултатите започват да се показват след около 2,5 минути след паралелизиране на задачата между 7 различни работници и зареждане на повече от 10 GB данни паралелно.

Визуализация:

Екранни снимки от Bokeh Server в Scheduler Task, който показва операциите, които се нареждат между работниците. Таблото за управление може да бъде достъпно в браузъра от IP адреса на планировчика и порт 8787:

Следната екранна снимка показва използването на ресурси (CPU и памет) за всеки работник:

Сега трябва да сте готови да направите малко магия за предварителна обработка!

В част 2 ще покажа малко код за изпълнение на анализи и предварителна обработка/инженеринг на функции и машинно обучение с помощта на Dask Cluster, който създадохме.

Благодаря, че прочетохте поста. Обратната връзка и градивната критика винаги са добре дошли. Ще прочета всичките ви коментари.

-Ще