Параллельное моделирование в DAG с помощью Airflow / Cloud Composer

Я хочу создать группу DAG для работы в Google Cloud Composer. Рабочий процесс содержит ParallelFor, и я не знаю, как его смоделировать.

Рабочий процесс выглядит примерно так:

task1 >> task2 >> task3 >> task4

где task2 разбивает данные на массивы x. Теперь я хочу запустить task3 параллельно для этих x-массивов. Task3 что-то выводит, а task4 объединяет выводы.

(вы можете найти изображение рабочего процесса здесь: https://github.com/Apollo-Workflows/Sentiment-Analysis)

На данный момент у меня есть две возможные идеи, как это могло бы работать:

  1. Для него существует простой синтаксис (например, >> для последовательного выполнения). Но такого синтаксиса я не нашел
  2. Работа с суб-DAG. Моя идея заключалась в том, чтобы добавить task2, чтобы он создавал x subDAG (по одному для каждого массива). SubDAG - это в основном задача3. После того, как все вложенные файлы DAG завершены, их вывод направляется в task4. Это возможно? Если да, то как мне это сделать?

person Kingvinst    schedule 17.05.2021    source источник


Ответы (2)


Я нашел решение своей проблемы. Это следует из моей первой идеи возможного решения. Просто воспользуйтесь механикой по этой ссылке:

Airflow повторно запускает одну задачу несколько раз в случае успеха

person Kingvinst    schedule 18.05.2021

Я считаю, что сообщение вы упомянутая как возможная идея, указывает направление выполнения задачи после завершения предыдущей.

Чтобы запускать даги параллельно, вы должны следовать структуре, подобной этой

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG("dag_paralel", description="Starting tutorial", schedule_interval=None,
          start_date=datetime(2019, 1, 1),
          catchup=False)

task_1 = BashOperator(task_id='task_1', bash_command='echo "This is task 1!"',dag=dag)
task_2 = BashOperator(task_id='task_2', bash_command='echo "This is task 2!"',dag=dag)

task_list = []
max_attempt = 3
for attempt in range(max_attempt):
    data_pull = BashOperator(
        task_id='task_3_{}'.format(attempt),
        bash_command='echo "This is task - 3_{}!"'.format(attempt),
        dag=dag
    )
    task_list.append(data_pull)


data_validation = BashOperator(task_id='task_final', bash_command='echo "We are at the end"',dag=dag)


task_1 >> task_2 >> task_list

task_list >> data_validation

Это структура DAG, полученная этим методом

введите описание изображения здесь

person Fran Verdejo    schedule 26.05.2021