Airflow BashOperator OSError: [Errno 2] Нет такого файла или каталога

Я продолжаю получать ту же ошибку от запланированного BashOperator, который в настоящее время выполняет обратное заполнение (он «отстает» более чем на месяц).

[2018-06-10 22:06:33,558] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dag_name task_name 2018-03-14T00:00:00 --job_id 50 --raw -sd DAGS_FOLDER/dag_file.py']
Traceback (most recent call last):
  File "/anaconda/bin//airflow", line 27, in <module>
    args.func(args)
  File "/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 387, in run
    run_job.run()
  File "/anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
    self._execute()
  File "/anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 2512, in _execute
    self.task_runner.start()
  File "/anaconda/lib/python2.7/site-packages/airflow/task_runner/bash_task_runner.py", line 29, in start
    self.process = self.run_command(['bash', '-c'], join_args=True)
  File "/anaconda/lib/python2.7/site-packages/airflow/task_runner/base_task_runner.py", line 120, in run_command
    universal_newlines=True
  File "/anaconda/lib/python2.7/subprocess.py", line 394, in __init__
    errread, errwrite)
  File "/anaconda/lib/python2.7/subprocess.py", line 1047, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory
[2018-06-10 22:06:33,633] {sequential_executor.py:47} ERROR - Failed to execute task Command 'airflow run dag_name task_name 2018-03-14T00:00:00 --local -sd /var/lib/airflow/dags/dag_file.py' returned non-zero exit status 1.

Я помню, что видел что-то, что предполагало, что это может быть проблема с разрешениями, но я не могу понять, какие разрешения могут быть задействованы.

Я использую конфигурацию systemd - и, в конце концов, я начал запускать веб-сервер и планировщик воздушного потока от имени пользователя root.

Я могу взять список в первой строке и ввести его дословно в оболочке ipython в качестве аргументов для экземпляра subprocess.Popen (как в airflow/task_runner/base_task_runner.py; не сохранять envs), и он не только запускается, но и правильно информирует базу данных воздушного потока, что задача выполнена. Я могу сделать это как пользователь Airflow, root или ubuntu.

Я добавил /anaconda/bin в PATH в .bashrc для Airflow, root, ubuntu и /etc/bash.bashrc в дополнение к значению для AIRFLOW_HOME, которое также находится в моем файле env /etc/airflow.

Вот как выглядит моя запись в systemd:

[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service    

[Service]
EnvironmentFile=/etc/airflow
User=root
Group=root
Type=simple
ExecStart=/anaconda/bin/airflow scheduler
Restart=always
RestartSec=5s    

[Install]
WantedBy=multi-user.target

Мой файл env:

PATH=$PATH:/anaconda/bin/
AIRFLOW_HOME=/var/lib/airflow
AIRFLOW_CONFIG=$AIRFLOW_HOME/airflow.cfg

Используя apache-airflow == 1.9.0 и отчаянно ищите решение. Заранее спасибо.

Airflow.cfg:

[core]
airflow_home = /var/lib/airflow
dags_folder = /var/lib/airflow/dags
base_log_folder = /var/lib/airflow/logs
remote_log_conn_id =
encrypt_s3_logs = False
logging_level = INFO
logging_config_class =
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
executor = SequentialExecutor
sql_alchemy_conn = {actual value hidden}
sql_alchemy_pool_size = 5
sql_alchemy_pool_recycle = 3600
parallelism = 4
dag_concurrency = 2
dags_are_paused_at_creation = True
non_pooled_task_slot_count = 16
max_active_runs_per_dag = 1
load_examples = False
plugins_folder = /var/lib/airflow/plugins
fernet_key = {actual value hidden}
donot_pickle = False
dagbag_import_timeout = 30
task_runner = BashTaskRunner
default_impersonation =
security =
unit_test_mode = False
task_log_reader = file.task
enable_xcom_pickling = True
killed_task_cleanup_time = 60
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
auth_backend = airflow.api.auth.backend.default
[operators]
default_owner = root
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[webserver]
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 60
secret_key = temporary_key
workers = 1
worker_class = sync
access_logfile = -
error_logfile = -
expose_config = False
authenticate = False
filter_by_owner = False
owner_mode = user
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
hide_paused_dags_by_default = False
page_size = 100
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = [email protected]
[celery]
...
[dask]
cluster_address = 127.0.0.1:8786
[scheduler]
job_heartbeat_sec = 120
scheduler_heartbeat_sec = 120
run_duration = -1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 300
child_process_log_directory = /var/lib/airflow/logs/scheduler
scheduler_zombie_task_threshold = 900
catchup_by_default = True
max_tis_per_query = 0
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
max_threads = 1
authenticate = False
[ldap]
...
[mesos]
...
[kerberos]
...
[github_enterprise]
...
[admin]
hide_sensitive_variable_fields = True

Добавление ls -hal

root@ubuntu:/var/lib/airflow# ls -hal /var
total 52K
drwxr-xr-x 13 root root   4.0K Jun  3 11:58 .
root@ubuntu:/var/lib/airflow# ls -hal /var/lib
total 164K
drwxr-xr-x 42 root     root     4.0K Jun 10 19:00 .
root@ubuntu:/var/lib/airflow# ls -hal
total 40K
drwxr-xr-x  4 airflow airflow 4.0K Jun 11 06:41 .
drwxr-xr-x 42 root    root    4.0K Jun 10 19:00 ..
-rw-r--r--  1 airflow airflow  13K Jun 11 06:41 airflow.cfg
-rw-r--r--  1 airflow airflow  579 Jun 10 19:00 airflow.conf
drwxr-xr-x  2 airflow airflow 4.0K Jun 10 21:27 dags
drwxr-xr-x  4 airflow airflow 4.0K Jun 10 20:31 logs
-rw-r--r--  1 airflow airflow 1.7K Jun 10 19:00 unittests.cfg
root@ubuntu:/var/lib/airflow# ls -hal dags/
total 16K
drwxr-xr-x 2 airflow airflow 4.0K Jun 10 21:27 .
drwxr-xr-x 4 airflow airflow 4.0K Jun 11 06:41 ..
-rw-r--r-- 1 airflow airflow 3.4K Jun 10 21:26 dag_file.py
-rw-r--r-- 1 airflow airflow 1.7K Jun 10 21:27 dag_file.pyc

и содержимое dag_file.py:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'root',
    'run_as': 'root',
    'depends_on_past': True,
    'start_date': datetime(2018, 2, 20),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'end_date': datetime(2018, 11, 15),
}
env = {
    'PSQL': '{obscured}',
    'PATH': '/anaconda/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin',
    'PWD': '/home/ubuntu/{obs1}/',
    'HOME': '/home/ubuntu',
    'PYTHONPATH': '/home/ubuntu/{obs1}',
}
dag = DAG(
    'dag_name',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1))
t1 = BashOperator(
    env=env,
    task_id='dag_file',
    bash_command='export PYTHONPATH=/home/ubuntu/{obs1} && /anaconda/bin/ipython $PYTHONPATH/{obs2}/{obs3}.py {{ ds }}',
    dag=dag)

И я напоминаю вам, что это работает правильно как airflow, root и ubuntu: airflow run dag_name dag_file 2018-03-17T00:00:00 --job_id 55 --raw -sd DAGS_FOLDER/dag_file.py


person artdv    schedule 11.06.2018    source источник
comment
Я добавил airflow.cfg. Однако, если бы это было неправильно настроено, я полагаю, что мой запуск из командной строки также не сработает.   -  person artdv    schedule 11.06.2018
comment
Похоже, вам может потребоваться показать, как выглядит DAG /var/lib/airflow/dags/dag_file.py - можете ли вы отредактировать любую конфиденциальную информацию и опубликовать ее содержимое?   -  person Simon D    schedule 11.06.2018
comment
Что такое chmod дагов в вашем каталоге дагов? Я бы предположил, что если бы это была проблема пути, systemd не смог бы запустить планировщик с самого начала.   -  person cwurtz    schedule 11.06.2018
comment
@CJWurtz добавил код.   -  person artdv    schedule 12.06.2018
comment
@SimonD добавил код.   -  person artdv    schedule 12.06.2018
comment
может попробовать chmod -R 0755 ${AIRLFOW_HOME}/dags, посмотреть, не изменится ли что-нибудь   -  person cwurtz    schedule 13.06.2018
comment
У меня была такая же проблема, и она была связана с планировщиком и тем, как я его настроил.   -  person Mahsa Seifikar    schedule 25.09.2020


Ответы (3)


Похоже на несоответствие версии python, отредактируйте .bashrc с правильной версией python и запустите:

source .bashrc

Это решит вашу проблему.

В моем случае мы используемexport PATH="/opt/miniconda3/bin":$PATH

Также, чтобы проверить, как я могу это сделать:

/opt/miniconda3/bin/python /opt/miniconda3/bin/airflow 

Вот как я управлял воздушным потоком.

person Deepesh Rehi    schedule 30.08.2018

В Airflow v1.10.0 вы просто указываете путь к файлу без пробела в конце.

Пример:

compact_output_task = BashOperator(**{
    'task_id': 'compact_output',
    'bash_command': './compact_output.sh',
    'xcom_push': True,
})
person Wesley Batista    schedule 02.11.2018

Systemd EnvironmentFile не будет расширять переменную внутри него, поэтому ваш PATH будет смотреть только на /anaconda/bin, если вы просто хотите расширить свой PATH, лучше использовать

ExecStart=/bin/bash -c 'PATH=/path/to/venv/bin/:$PATH exec /path/to/airflow scheduler

это решило мою проблему с Нет такого файла или каталога, потому что воздушный поток не смог найти двоичный файл, который я вызывал внутри своего оператора bash.

person Moeen M    schedule 07.06.2019