Внешние файлы в Airflow DAG

Я пытаюсь получить доступ к внешним файлам в Airflow Task, чтобы прочитать некоторый sql, и получаю «файл не найден». Кто-нибудь сталкивался с этим?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

В журнале указано следующее:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

Я понимаю, что могу просто скопировать и вставить запрос в один и тот же файл, это действительно не изящное решение. Есть несколько запросов, и текст действительно большой, встраивание его в код Python ухудшит читаемость.


person Alessandro Mariani    schedule 23.03.2017    source источник


Ответы (3)


Вот пример использования переменной, чтобы упростить задачу.

  • Сначала добавьте переменную в Airflow UI -> Admin -> Variable, например. {key: 'sql_path', values: 'your_sql_script_folder'}

  • Затем добавьте следующий код в свой DAG, чтобы использовать только что добавленную переменную из Airflow.

Код DAG:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • Теперь вы можете использовать имя сценария sql или путь в папке Variable.

  • Подробнее см. в этом

person zhongjiajie    schedule 07.09.2017
comment
Приведите, пожалуйста, полный пример. Определяя template_searchpath, что это меняет общее поведение скрипта, могу ли я теперь ссылаться на файл по его имени? Например, это завершит ваш пример: `` с open (query_file_name, 'r') as file: query_content = file.read () ``? - person Ricardo M S; 12.02.2020
comment
Я не думаю, что это сработает с примером DAG, который OP использует с PythonOperator и собственным open() Python. PythonOperator работает в модуле, который не имеет доступа к тому же набору расположений, что и процесс, который анализирует DAG. - person LondonRob; 17.02.2020
comment
@RicardoMS Привет, когда вы хотите определить свой собственный airflow.models.Variable, самый простой способ - создать новую переменную с помощью интерфейса Airflow, homepage -> Admin -> Variables, например: {'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}. После того, как вы закончите, вы можете использовать пример кода для загрузки вашей переменной tmpl_search_path = Variable.get("RicardoMS_variable") вместо прямого использования '/opt/specific/path' - person zhongjiajie; 18.02.2020

Все относительные пути используются относительно переменной среды AIRFLOW_HOME. Пытаться:

  • Предоставление абсолютного пути
  • поместите файл относительно AIRFLOW_HOME
  • попробуйте зарегистрировать PWD в вызываемом Python, а затем решить, какой путь указать (лучший вариант)
person Priyank Mehta    schedule 23.03.2017
comment
Хороший комментарий, но, к сожалению, AIRFLOW_HOME - это необязательная переменная среды - Airflow отлично работает без нее - и вы не можете гарантировать, что она будет установлена. - person Kirk Broadhurst; 16.11.2017

Предполагая, что каталог sql относительно текущего файла Python, вы можете определить абсолютный путь к файлу sql следующим образом:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)
person Jake    schedule 06.05.2021