Динамическое создание DAG на основе строки, доступной при подключении к БД

Я хочу создать динамически созданный DAG из запроса таблицы базы данных. Когда я пытаюсь создать динамически создаваемый DAG из диапазона точного числа или на основе доступного объекта в настройках воздушного потока, это удалось. Однако, когда я пытаюсь использовать PostgresHook и создавать DAG для каждой строки моей таблицы, я вижу новый DAG, генерируемый всякий раз, когда я добавляю новую строку в свою таблицу. Однако оказалось, что я не могу щелкнуть только что созданный DAG в пользовательском интерфейсе веб-сервера воздушного потока. Для большего контекста я использую Google Cloud Composer. Я уже выполнил шаги, указанные в Группы DAG не доступны для кликов на веб-сервере Google Cloud Composer, но нормально работают с локальным потоком воздуха. Однако в моем случае это все еще не работает.

Вот мой код

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["[email protected]"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

Лучший,


person irvifa    schedule 19.10.2018    source источник
comment
Покажите, пожалуйста, какой код вы пробовали, например хук postgres.   -  person Meghdeep Ray    schedule 19.10.2018
comment
Привет, я уже выложил код, спасибо. Я думаю, это связано с stackoverflow.com/questions/51218314/ Однако проблема все еще сохраняется :(   -  person irvifa    schedule 19.10.2018


Ответы (1)


Эту проблему можно решить с помощью самоуправляемого веб-сервера Airflow, выполнив шаги, упомянутые в [1]. После этого, если вы решите добавить аутентификацию перед своим самоуправляемым веб-сервером, после создания входа ваши BackendServices должны появиться в консоли Google IAP, и вы сможете включить IAP. Если вы хотите получить доступ к своему воздушному потоку программно, вы также можете использовать аутентификацию JWT, используя служебную учетную запись для вашего самоуправляемого веб-сервера Airflow [2].

[1] https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver

[2] https://cloud.google.com/iap/docs/authentication-howto

person irvifa    schedule 08.01.2019