Воздушный поток - запускайте каждую функцию Python отдельно

У меня есть сценарий воздушного потока ниже, который запускает все сценарии Python как одну функцию. Я хотел бы, чтобы каждая функция python запускалась индивидуально, чтобы я мог отслеживать каждую функцию и их статус.

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB


def db_log():
    global db_con
    try:
    db_con = psycopg2.connect(
    " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
    except:
        print("Connection Failed.")
        print('Connected successfully')
    return (db_con)

def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")


def job_run():
    db_log()
    insert_data()



##########################################

t1 = PythonOperator(
    task_id='DB_Connect',
    python_callable=job_run,
    # bash_command='python3 ~/airflow/dags/sample.py',
    dag=dag)

t1

Приведенный выше сценарий работает нормально, но хотелось бы разбить его по функциям, чтобы лучше отслеживать. Может ли кто-нибудь помочь в этом. Tnx ..

Обновленный код (версия 2):

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB


def db_log(**kwargs):
    global db_con
    try:
    db_con = psycopg2.connect(
    " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
    except:
        print("Connection Failed.")
        print('Connected successfully')
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(value="db_con", key="db_log")
        return (db_con)

def insert_data(**kwargs):
    v1 = task_instance.xcom_pull(key="db_con", task_ids='db_log')
    return (v1)
    cur = db_con.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")

#def job_run():
#    db_log()
#    insert_data()


##########################################

t1 = PythonOperator(
    task_id='Connect',
    python_callable=db_log,provide_context=True,
    dag=dag)

t2 = PythonOperator(
    task_id='Query',
    python_callable=insert_data,provide_context=True,
    dag=dag)


t1 >> t2

person dark horse    schedule 28.05.2018    source источник
comment
@ tobi6, получить ошибку в синтаксисе bash_command (2-я последняя строка обновленного кода). Не могли бы вы посоветовать, в чем я ошибся. Tnx.   -  person dark horse    schedule 28.05.2018
comment
@ tobi6 Сообщение об ошибке: bash_command = 'python3 ~ / airflow / dags / sample.py {{task_instance.xcom_pull (task_id =' job_run ', key =' dwh_connection ')}}', dag = dag,) ^ SyntaxError: недопустимый синтаксис Как я могу это исправить. Не могли бы вы посоветовать .. Tnx   -  person dark horse    schedule 28.05.2018
comment
@ tobi6, я подумал, что нам нужно передать insert_data таким же образом, и поэтому использовал его таким образом. Что касается оператора bash, я не уверен, как передать данные для входа (datawarehouse_login ()), на которые будут ссылаться несколько функций в сценарии. Не могли бы вы дать совет по этому поводу. Tnx ..   -  person dark horse    schedule 29.05.2018
comment
@ tobi6, спасибо, я пытаюсь соотнести каждую из этих точек в рабочем сценарии. Мой вопрос в том, что нам нужно, чтобы bash_command правильно передавала команду task_instance.xcom_pull. Также, если у задания есть разные подфункции, как мы можем отслеживать каждую из функций. Поскольку, насколько я понимаю, оператор bash вызывается только один раз в приведенном выше коде. Не могли бы вы пояснить вышесказанное. Tnx ..   -  person dark horse    schedule 29.05.2018
comment
Позвольте нам продолжить это обсуждение в чате.   -  person dark horse    schedule 29.05.2018
comment
@ tobi6, я изменил сценарий на основе предыдущих комментариев и документации (см. Обновленный код (версия 2) в начальном сообщении). В настоящее время я получаю сообщение об ошибке Subtask: NameError: name 'task_instance' не определено. Не могли бы вы посоветовать, что не так в приведенном выше коде. Tnx ..   -  person dark horse    schedule 30.05.2018
comment
Опять же, задайте новый вопрос, почему возникает эта ошибка. Я уже вижу, что не так, но для других это бесполезно, если ваш вопрос продолжает развиваться и есть много-много комментариев (обычно эти неофициальные удаляются позже комментатором)   -  person tobi6    schedule 30.05.2018


Ответы (1)


Для этого есть два возможных решения:

A) Создайте несколько задач для каждой функции

Задачи в Airflow вызываются в отдельных процессах. Переменные, которые определены как global, не будут работать, поскольку вторая задача обычно не может видеть переменные первой задачи.

Представляем: XCOM. Это функция Airflow, и мы уже ответили на несколько вопросов по этому поводу, например здесь (с примерами): Python Airflow - возврат результата из PythonOperator

ИЗМЕНИТЬ

Вы должны предоставить контекст и передать контекст, как написано в примерах. Для вашего примера это будет означать:

  • добавьте provide_context=True, в свой PythonOperator
  • изменить подпись job_run на def job_run(**kwargs):
  • передать kwargs в data_warehouse_login с data_warehouse_login(kwargs) внутри функции

Б) Создайте одну законченную функцию

В этом самом сценарии я бы все равно удалил глобальный (просто позвонил insert_data, вызвал data_warehouse_login изнутри и вернул соединение) и использовал бы только одну задачу.

В случае ошибки выбросить исключение. Airflow справится с этим отлично. Просто убедитесь, что вы поместили соответствующие сообщения в исключение и используйте лучший тип исключения.

person tobi6    schedule 28.05.2018