Предварительная обработка распределенных данных с использованием Dask, Amazon ECS и Python (часть 1)
Качество и точность моделей машинного обучения зависят от многих факторов. Одним из наиболее важных факторов является предварительная обработка набора данных перед подачей его в алгоритм машинного обучения, который учится на данных. Поэтому очень важно, чтобы вы предоставили им правильные данные для проблемы, которую вы хотите решить. Например, некоторые алгоритмы не поддерживают нулевые значения и должны быть преобразованы и обработаны перед обучением. Вы также должны понимать, почему отсутствуют значения, существует ли определенный шаблон для этих значений и каково их влияние.
Специалисты по обработке данных и специалисты по обработке данных часто используют инструменты из экосистемы Python, такие как Numpy и Pandas, для анализа, преобразования и визуализации своих данных, которые разработаны для обеспечения высокой производительности, интуитивности и эффективности. библиотеки. Выполнение таких операций с небольшим набором данных быстрым и масштабируемым образом не является сложной задачей, если набор данных может уместиться в памяти одной машины. Однако, если набор данных слишком велик и не может поместиться на одной машине, инженеры по обработке данных могут быть вынуждены переписать свой код в более масштабируемые инструменты, такие как Spark и SparkML, которые могут быть вычислительно поддержаны большим кластером EMR.
Чтобы преодолеть эту проблему, я использую Даск. Dask использует знакомство людей с известными библиотеками, такими как Pandas, и вы можете использовать его для разработки кода для масштабируемой, параллельной и распределенной обработки данных.
В этой статье я обсуждаю, как создать бессерверный кластер для предварительной обработки данных в распределенной и параллельной манере. Я также использую кластер Dask Distributed, который обеспечивает расширенный параллелизм для аналитики и обработки данных. Чтобы обеспечить масштабируемую производительность, оптимизируя стоимость и сохраняя гибкость решения, я построю бессерверный кластер с использованием AWS Fargate, который не требует накладных расходов на управление кластером и обеспечивает один вызов API для масштабирования кластера вверх и вниз, а затем интегрирую кластер с Amazon. Блокнот SageMaker Jupyter. (Или любой другой предпочтительной IDE)
Что такое Dask Distributed?
Dask.distributed: - это легкая библиотека с открытым исходным кодом для распределенных вычислений на Python. Это также централизованно управляемый распределенный динамический планировщик задач. Dask состоит из трех основных компонентов:
процесс dask-scheduler: координирует действия нескольких рабочих. Планировщик является асинхронным и управляемым событиями, он одновременно отвечает на запросы вычислений от нескольких клиентов и отслеживает прогресс нескольких рабочих процессов.
процессы dask-worker:, которые распределены по нескольким машинам и выполняют одновременные запросы нескольких клиентов.
процесс dask-client, который является основной точкой входа для пользователей dask.distributed.
Схема решения:
Мне нужно поддерживать связь с низкой задержкой и простые сетевые конфигурации между ноутбуком Juypter и кластером Fargate. Следовательно, я создаю как Notebook, так и распределенный кластер Dask в одном виртуальном частном облаке (VPC).
Действия перед развертыванием:
Во-первых, мне нужно создать репозиторий эластичного реестра контейнеров (ECR) в качестве предварительного условия. Для этого я перехожу в Консоль AWS - ›Elastic Container Service (ECS) -› Выберите Репозитории, затем «Создать репозиторий».
(Вы также можете пропустить этот шаг и создать репозиторий на github)
Нам нужно дать ему имя - «dask», затем нажать «Следующий шаг»:
Затем мы переходим на страницу «Начало работы», где находим начальные команды для создания и отправки образа контейнера.
Теперь, когда репозиторий готов, я переключусь на оболочку «Терминал», чтобы загрузить, построить и пометить образ контейнера, а затем отправить его в ECR. Для этого я запускаю следующие команды оболочки:
bash# git clone https://github.com/wmlba/ECS-Dask.git bash# cd ECS-Dask; tar -xzf base-image.tar.gz; cd base-image bash# `aws ecr get-login --no-include-email --region us-east-1` bash# docker build -t dask . bash# docker tag dask:latest <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest bash# docker push <AWS Account ID>.dkr.ecr.us-east-1.amazonaws.com/dask:latest
ПРИМЕЧАНИЕ. Для выполнения вышеуказанных команд необходимо, чтобы Docker был установлен локально на компьютере, с которого выполнялись команды, и были настроены учетные данные AWS.
Образ докера, который я использую, немного изменен по сравнению с тем, который опубликован в репозитории dask.
Создание образа и его отправка в репозиторий ECR, созданный на предыдущем шаге, займет несколько минут. В этом можно убедиться, щелкнув имя репозитория в консоли ECS.
Разверните решение
Теперь, когда у меня готов образ, мне нужно развернуть решение с использованием шаблона CloudFormation, выполнив следующие действия:
- запустите этот шаблон CloudFormation в своем аккаунте. Для завершения работы стека CloudFormation требуется примерно 3-5 минут.
Стек CloudFormation будет создавать такие ресурсы, как: кластер Fargate, определения задач, службы и задачи как для рабочего Dask, так и для планировщика. Он также создаст роль и политику выполнения IAM, чтобы разрешить доступ к репозиторию эластичного реестра контейнеров (ECR) и группам журналов CloudWatch для журналов. По умолчанию все ресурсы будут созданы в регионе us-east-1.
2. На странице «Указать подробности» я указываю частную подсеть со шлюзом NAT для кластера. Планировщик Dask и рабочие будут взаимодействовать через частную сеть, а шлюз NAT необходим только для того, чтобы служба ECS могла извлечь образ ECR из репозитория. Затем выберите Далее.
3. На странице Параметры нажмите Далее.
4. На странице Обзор просмотрите и подтвердите настройки. Обязательно установите флажок, подтверждающий, что шаблон будет создавать ресурсы AWS Identity and Access Management (IAM) с настраиваемыми именами. Чтобы развернуть стек, выберите Создать. Через несколько минут создание стека должно быть завершено.
После создания стека вы также можете подтвердить, что кластер ECS Dask развернут и работает. Вы можете проверить это, переключившись на консоль ECS - ›Нажмите Кластеры -› Нажмите Fargate-Dask-Cluster и на вкладке задач должны быть 2 запущенные задачи:
Теперь, когда кластер Dask готов, я создам SageMaker Notebook, чтобы начать использовать кластер. Для этого я переключаюсь в консоль SageMaker - ›Экземпляры записных книжек -› Создать экземпляр записной книжки.
Затем я выберу те же VPC и подсети, которые были выбраны ранее в шаблоне CloudFormation:
ПРИМЕЧАНИЕ. Вы можете выбрать любую другую подсеть и группу безопасности, если вы разрешаете доступ между записной книжкой SageMaker и кластером Dask.
Затем я создаю новую записную книжку python3, нажимая Создать - ›conda_python3. Пакеты Dask устанавливаются на ноутбук SageMaker по умолчанию, но важно убедиться, что пакет обновлен до последней версии. Чтобы убедиться в этом, я запущу на ноутбуке команду conda update:
ПРИМЕЧАНИЕ. Если версия клиента ниже, чем версия планировщика и исполнителя, вы столкнетесь с ошибками при запуске клиента.
Следующим шагом будет создание клиента и подключение к кластеру dask с помощью следующего кода:
Обратите внимание, что я использовал DNS-имя планировщика, которое было автоматически назначено с помощью Функциональности обнаружения служб ECS, которая использует действия API автоматического именования Route 53 для управления записями DNS Route 53.
Теперь давайте проделаем некоторые операции с данными с помощью кластера, но перед этим я увеличу количество рабочих в кластере до 7 рабочих. Для этого я запускаю одну команду в записной книжке, как показано ниже:
Через несколько секунд статус рабочих задач в консоли Fargate изменится на «РАБОТАЕТ». Я перезапущу клиент Dask, чтобы убедиться, что мы используем природу параллелизма кластера.
Теперь у нас есть кластер из 14 ядер ЦП и 12 ГБ памяти (2 ядра ЦП и 2 ГБ памяти для каждого из 7 рабочих). Давайте выполним некоторые операции с интенсивными вычислениями и памятью и получим некоторые выводы. Я загружаю фреймворк данных dask с данными и вычисляю расстояние поездки и группирую по количеству пассажиров.
Результаты начинают появляться примерно через 2,5 минуты после распараллеливания задачи между 7 разными рабочими процессами и параллельной загрузки более 10 ГБ данных.
Визуализация:
Снимки экрана с сервера Bokeh в задаче планировщика, показывающие операции, выполняемые между рабочими процессами. Доступ к панели управления в браузере можно получить с IP-адреса планировщика и порта 8787:
На следующем снимке экрана показано использование ресурсов (ЦП и памяти) для каждого рабочего:
Теперь вы должны быть готовы совершить некоторую магию предварительной обработки!
Во второй части я покажу код для запуска аналитики и предварительной обработки / разработки функций и машинного обучения с использованием созданного нами кластера Dask.
Спасибо, что прочитали пост. Отзывы и конструктивная критика всегда приветствуются. Я прочитаю все ваши комментарии.
-Воля