Как добавить ручные задачи в Apache Airflow Dag

Я использую Apache Airflow для управления конвейером обработки данных. В середине конвейера некоторые данные необходимо просмотреть перед обработкой следующего шага. например ... -> task1 -> human review -> task2 -> ... где задача1 и задача2 - задача обработки данных. Когда задача 1 завершена, данные, сгенерированные задачей 1, должны быть проверены человеком. После того, как рецензент утвердил данные, можно было запускать задачу 2. Задачи проверки, выполняемые человеком, могут занять очень много времени (например, несколько недель).

Я подумываю использовать внешнюю базу данных для хранения результатов проверки, проведенной человеком. И используйте Sensor, чтобы сузить результат обзора по временному интервалу. Но пока проверка не будет завершена, он будет занят сотрудником Airflow.

любая идея?


person Freedom    schedule 02.02.2018    source источник


Ответы (4)


Ваша идея мне кажется хорошей. Вы можете создать выделенную группу DAG, чтобы проверять ход процесса утверждения с помощью датчика. Если вы используете низкий тайм-аут для своего датчика и соответствующее расписание для этого DAG, скажем, каждые 6 часов. Адаптируйте его к тому, как часто эти задачи утверждаются и как скоро вам нужно выполнять последующие задачи.

person Antoine Augusti    schedule 02.02.2018

Копилка от ответа свободы и Ответ Роберта Эллиота, вот полный рабочий пример, который дает пользователю две недели на просмотр результатов первая задача перед окончательной неудачей:

from datetime import timedelta

from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

from my_tasks import first_task_callable, second_task_callable


TIMEOUT = timedelta(days=14)


def task_to_fail():
    raise AirflowException("Please change this step to success to continue")


dag = DAG(dag_id="my_dag")

first_task = PythonOperator(
    dag=dag,
    task_id="first_task",
    python_callable=first_task_callable
)

manual_sign_off = PythonOperator(
    dag=dag,
    task_id="manual_sign_off",
    python_callable=task_to_fail,
    retries=1,
    max_retry_delay=TIMEOUT
)

second_task = PythonOperator(
    dag=dag,
    task_id="second_task",
    python_callable=second_task_callable
)

first_task >> manual_sign_off >> second_task
person turnerm    schedule 08.12.2019
comment
Почему вы используете max_retry_delay вместо retry_delay? Я бы сказал, что retry_delay в данном случае более уместен. - person LaurensVijnck; 13.10.2020

Коллега предложил выполнить задачу, которая всегда терпит неудачу, поэтому вручную нужно просто отметить ее как успешную. Я реализовал это так:

def always_fail():
    raise AirflowException('Please change this step to success to continue')


manual_sign_off = PythonOperator(
    task_id='manual_sign_off',
    dag=dag,
    python_callable=always_fail
)

start >> manual_sign_off >> end
person Robert Elliot    schedule 21.08.2019

До версии 1.10 я использовал функцию повтора оператора для реализации ManualSignOffTask. Оператор установил retries и retry_delay. Таким образом, задача будет перенесена после сбоя. Когда задача запланирована, она проверяет базу данных, чтобы увидеть, завершена ли выход из системы: если выход еще не был выполнен, задача завершается ошибкой и освобождает работника и ждет следующего расписания. Если завершение выполнено, задача выполняется успешно, и запуск dag продолжается.

После 1.10 вводится новое состояние TI UP_FOR_RESCHEDULE, и датчик изначально поддерживает длительные задачи.

person Freedom    schedule 23.08.2019