Как динамически создавать субтеги в Airflow

У меня есть основной даг, который извлекает файл и разбивает данные в этом файле на отдельные файлы csv. У меня есть другой набор задач, которые необходимо выполнить для каждого файла из этих файлов csv. например (загрузка в GCS, вставка в BigQuery) Как я могу динамически сгенерировать SubDag для каждого файла в зависимости от количества файлов? SubDag будет определять такие задачи, как загрузка в GCS, вставка в BigQuery, удаление файла csv)

Так вот как это выглядит сейчас

main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...)  # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files

def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
    ...
    ...

Как я могу вызвать subdag_factory для каждого файла, созданного в transform_operator?


person AshanPerera    schedule 23.02.2018    source источник
comment
Всегда ли данные разбиваются на одинаковое количество файлов? Или это будет меняться при каждом запуске?   -  person Blakey    schedule 23.02.2018
comment
Предполагается, что он будет исправлен, но есть вероятность, что он будет варьироваться от 1 до 7.   -  person AshanPerera    schedule 24.02.2018


Ответы (2)


Я пробовал создавать subdags динамически следующим образом

# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
    default_args_copy = default_args.copy()

    # dag
    dag = DAG(dag_id=dag_id_child,
              default_args=default_args_copy,
              schedule_interval='@once')

    # operators
    tid_check = 'check2_db_' + db_name
    py_op_check = PythonOperator(task_id=tid_check, dag=dag,
                                 python_callable=check_sync_enabled,
                                 op_args=[db_name])

    tid_spark = 'spark2_submit_' + db_name
    py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
                                 python_callable=spark_submit,
                                 op_args=[db_name])

    py_op_check >> py_op_spark
    return dag

# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
    tid_subdag = 'subdag_' + db_name
    subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
    subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
    # chain subdag-operators together
    airflow.utils.helpers.chain(*subdags)
    return subdags


# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
          default_args=default_args,
          schedule_interval=None)

subdag_ops = create_subdag_operators(dag, db_names)

Обратите внимание, что список входов, для которых создаются subdags, здесь db_names, может быть либо объявлен статически в файле python, либо может быть прочитан из внешнего источника.

В результате DAG выглядит так:  введите описание изображения здесь  введите описание изображения здесь

Погружение в SubDAG (s)

введите описание изображения здесь

введите описание изображения здесь

person y2k-shubham    schedule 11.07.2018
comment
Полный исходный код можно найти здесь - person y2k-shubham; 11.07.2018
comment
Привет, y2k, мне это нравится, но что будет, если нет dbnames? как будто это нужно делать с файлами, а папка пуста. Спасибо! - person arcee123; 16.02.2020

Airflow работает с DAG двумя разными способами.

  1. Один из способов - определить динамический DAG в одном файле Python и поместить его в dags_folder. И он генерирует динамический DAG на основе внешнего источника (файлы конфигурации в другом каталоге, SQL, noSQL и т. Д.). Меньше изменений в структуре DAG - лучше (на самом деле справедливо для всех ситуаций). Например, наш DAG-файл генерирует даги для каждой записи (или файла), он также генерирует dag_id. Каждый пульс планировщика воздушного потока этот код проходит по списку и генерирует соответствующий DAG. Плюсов :) не так уж и много, достаточно изменить всего один файл кода. Минусов много, и это связано с тем, как работает Airflow. Для каждого нового DAG (dag_id) воздушный поток записывает шаги в базу данных, поэтому при изменении количества шагов или имени шага это может сломать веб-сервер. Когда вы удаляете DAG из своего списка, это становится своего рода приютом, вы не можете получить к нему доступ из веб-интерфейса и не можете контролировать DAG, вы не можете видеть шаги, вы не можете перезапустить и так далее. Если у вас есть статический список DAG и идентификаторов, которые не собираются меняться, но шаги, которые иногда выполняются, этот метод приемлем.

  2. Итак, в какой-то момент я придумал другое решение. У вас есть статические DAG (они по-прежнему динамические, скрипт их генерирует, но их структура, идентификаторы не меняются). Таким образом, вместо одного сценария, который просматривает список, как в каталоге, и генерирует DAG. Вы делаете две статические группы DAG, одна периодически контролирует каталог (* / 10 ****), другая запускается первым. Поэтому, когда появляется новый файл / файлы, первый DAG запускает второй с arg conf. Следующий код должен быть выполнен для каждого файла в каталоге.

         session = settings.Session()
         dr = DagRun(
                     dag_id=dag_to_be_triggered,
                     run_id=uuid_run_id,
                     conf={'file_path': path_to_the_file},
                     execution_date=datetime.now(),
                     start_date=datetime.now(),
                     external_trigger=True)
         logging.info("Creating DagRun {}".format(dr))
         session.add(dr)
         session.commit()
         session.close()
     

Запущенный DAG может получить аргумент конфигурации и завершить все необходимые задачи для конкретного файла. Чтобы получить доступ к параметру conf, используйте это:

    def work_with_the_file(**context):
        path_to_file = context['dag_run'].conf['file_path'] \
            if 'file_path' in context['dag_run'].conf else None

        if not path_to_file:
            raise Exception('path_to_file must be provided')

Плюсы всей гибкости и функциональности Airflow

Минусы монитора DAG могут быть спамом.

person Andrey Kartashov    schedule 25.02.2018