Предварительная обработка распределенных данных с использованием Dask, Amazon ECS и Python (часть 1)

Качество и точность моделей машинного обучения зависят от многих факторов. Одним из наиболее важных факторов является предварительная обработка набора данных перед подачей его в алгоритм машинного обучения, который учится на данных. Поэтому очень важно, чтобы вы предоставили им правильные данные для проблемы, которую вы хотите решить. Например, некоторые алгоритмы не поддерживают нулевые значения и должны быть преобразованы и обработаны перед обучением. Вы также должны понимать, почему отсутствуют значения, существует ли определенный шаблон для этих значений и каково их влияние.

Специалисты по обработке данных и специалисты по обработке данных часто используют инструменты из экосистемы Python, такие как Numpy и Pandas, для анализа, преобразования и визуализации своих данных, которые разработаны для обеспечения высокой производительности, интуитивности и эффективности. библиотеки. Выполнение таких операций с небольшим набором данных быстрым и масштабируемым образом не является сложной задачей, если набор данных может уместиться в памяти одной машины. Однако, если набор данных слишком велик и не может поместиться на одной машине, инженеры по обработке данных могут быть вынуждены переписать свой код в более масштабируемые инструменты, такие как Spark и SparkML, которые могут быть вычислительно поддержаны большим кластером EMR.

Чтобы преодолеть эту проблему, я использую Даск. Dask использует знакомство людей с известными библиотеками, такими как Pandas, и вы можете использовать его для разработки кода для масштабируемой, параллельной и распределенной обработки данных.

В этой статье я обсуждаю, как создать бессерверный кластер для предварительной обработки данных в распределенной и параллельной манере. Я также использую кластер Dask Distributed, который обеспечивает расширенный параллелизм для аналитики и обработки данных. Чтобы обеспечить масштабируемую производительность, оптимизируя стоимость и сохраняя гибкость решения, я построю бессерверный кластер с использованием AWS Fargate, который не требует накладных расходов на управление кластером и обеспечивает один вызов API для масштабирования кластера вверх и вниз, а затем интегрирую кластер с Amazon. Блокнот SageMaker Jupyter. (Или любой другой предпочтительной IDE)

Что такое Dask Distributed?

Dask.distributed: - это легкая библиотека с открытым исходным кодом для распределенных вычислений на Python. Это также централизованно управляемый распределенный динамический планировщик задач. Dask состоит из трех основных компонентов:

процесс dask-scheduler: координирует действия нескольких рабочих. Планировщик является асинхронным и управляемым событиями, он одновременно отвечает на запросы вычислений от нескольких клиентов и отслеживает прогресс нескольких рабочих процессов.

процессы dask-worker:, которые распределены по нескольким машинам и выполняют одновременные запросы нескольких клиентов.

процесс dask-client, который является основной точкой входа для пользователей dask.distributed.

Схема решения:

Мне нужно поддерживать связь с низкой задержкой и простые сетевые конфигурации между ноутбуком Juypter и кластером Fargate. Следовательно, я создаю как Notebook, так и распределенный кластер Dask в одном виртуальном частном облаке (VPC).

Действия перед развертыванием:

Во-первых, мне нужно создать репозиторий эластичного реестра контейнеров (ECR) в качестве предварительного условия. Для этого я перехожу в Консоль AWS - ›Elastic Container Service (ECS) -› Выберите Репозитории, затем «Создать репозиторий».

(Вы также можете пропустить этот шаг и создать репозиторий на github)

Нам нужно дать ему имя - «dask», затем нажать «Следующий шаг»:

Затем мы переходим на страницу «Начало работы», где находим начальные команды для создания и отправки образа контейнера.

Теперь, когда репозиторий готов, я переключусь на оболочку «Терминал», чтобы загрузить, построить и пометить образ контейнера, а затем отправить его в ECR. Для этого я запускаю следующие команды оболочки:

bash# git clone https://github.com/wmlba/ECS-Dask.git
bash# cd ECS-Dask; tar -xzf base-image.tar.gz; cd base-image
bash# `aws ecr get-login --no-include-email --region us-east-1`
bash# docker build -t dask .
bash# docker tag dask:latest <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest
bash# docker push <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest

ПРИМЕЧАНИЕ. Для выполнения вышеуказанных команд необходимо, чтобы Docker был установлен локально на компьютере, с которого выполнялись команды, и были настроены учетные данные AWS.

Образ докера, который я использую, немного изменен по сравнению с тем, который опубликован в репозитории dask.

Создание образа и его отправка в репозиторий ECR, созданный на предыдущем шаге, займет несколько минут. В этом можно убедиться, щелкнув имя репозитория в консоли ECS.

Разверните решение

Теперь, когда у меня готов образ, мне нужно развернуть решение с использованием шаблона CloudFormation, выполнив следующие действия:

  1. запустите этот шаблон CloudFormation в своем аккаунте. Для завершения работы стека CloudFormation требуется примерно 3-5 минут.

Стек CloudFormation будет создавать такие ресурсы, как: кластер Fargate, определения задач, службы и задачи как для рабочего Dask, так и для планировщика. Он также создаст роль и политику выполнения IAM, чтобы разрешить доступ к репозиторию эластичного реестра контейнеров (ECR) и группам журналов CloudWatch для журналов. По умолчанию все ресурсы будут созданы в регионе us-east-1.

2. На странице «Указать подробности» я указываю частную подсеть со шлюзом NAT для кластера. Планировщик Dask и рабочие будут взаимодействовать через частную сеть, а шлюз NAT необходим только для того, чтобы служба ECS могла извлечь образ ECR из репозитория. Затем выберите Далее.

3. На странице Параметры нажмите Далее.

4. На странице Обзор просмотрите и подтвердите настройки. Обязательно установите флажок, подтверждающий, что шаблон будет создавать ресурсы AWS Identity and Access Management (IAM) с настраиваемыми именами. Чтобы развернуть стек, выберите Создать. Через несколько минут создание стека должно быть завершено.

После создания стека вы также можете подтвердить, что кластер ECS Dask развернут и работает. Вы можете проверить это, переключившись на консоль ECS - ›Нажмите Кластеры -› Нажмите Fargate-Dask-Cluster и на вкладке задач должны быть 2 запущенные задачи:

Теперь, когда кластер Dask готов, я создам SageMaker Notebook, чтобы начать использовать кластер. Для этого я переключаюсь в консоль SageMaker - ›Экземпляры записных книжек -› Создать экземпляр записной книжки.

Затем я выберу те же VPC и подсети, которые были выбраны ранее в шаблоне CloudFormation:

ПРИМЕЧАНИЕ. Вы можете выбрать любую другую подсеть и группу безопасности, если вы разрешаете доступ между записной книжкой SageMaker и кластером Dask.

Затем я создаю новую записную книжку python3, нажимая Создать - ›conda_python3. Пакеты Dask устанавливаются на ноутбук SageMaker по умолчанию, но важно убедиться, что пакет обновлен до последней версии. Чтобы убедиться в этом, я запущу на ноутбуке команду conda update:

ПРИМЕЧАНИЕ. Если версия клиента ниже, чем версия планировщика и исполнителя, вы столкнетесь с ошибками при запуске клиента.

Следующим шагом будет создание клиента и подключение к кластеру dask с помощью следующего кода:

Обратите внимание, что я использовал DNS-имя планировщика, которое было автоматически назначено с помощью Функциональности обнаружения служб ECS, которая использует действия API автоматического именования Route 53 для управления записями DNS Route 53.

Теперь давайте проделаем некоторые операции с данными с помощью кластера, но перед этим я увеличу количество рабочих в кластере до 7 рабочих. Для этого я запускаю одну команду в записной книжке, как показано ниже:

Через несколько секунд статус рабочих задач в консоли Fargate изменится на «РАБОТАЕТ». Я перезапущу клиент Dask, чтобы убедиться, что мы используем природу параллелизма кластера.

Теперь у нас есть кластер из 14 ядер ЦП и 12 ГБ памяти (2 ядра ЦП и 2 ГБ памяти для каждого из 7 рабочих). Давайте выполним некоторые операции с интенсивными вычислениями и памятью и получим некоторые выводы. Я загружаю фреймворк данных dask с данными и вычисляю расстояние поездки и группирую по количеству пассажиров.

Результаты начинают появляться примерно через 2,5 минуты после распараллеливания задачи между 7 разными рабочими процессами и параллельной загрузки более 10 ГБ данных.

Визуализация:

Снимки экрана с сервера Bokeh в задаче планировщика, показывающие операции, выполняемые между рабочими процессами. Доступ к панели управления в браузере можно получить с IP-адреса планировщика и порта 8787:

На следующем снимке экрана показано использование ресурсов (ЦП и памяти) для каждого рабочего:

Теперь вы должны быть готовы совершить некоторую магию предварительной обработки!

Во второй части я покажу код для запуска аналитики и предварительной обработки / разработки функций и машинного обучения с использованием созданного нами кластера Dask.

Спасибо, что прочитали пост. Отзывы и конструктивная критика всегда приветствуются. Я прочитаю все ваши комментарии.

-Воля