Как контролировать работу Spark с помощью Airflow

Я настроил несколько dags, которые в конечном итоге заканчиваются командой spark-submit для искрового кластера. Я использую кластерный режим, если это имеет значение. В любом случае, мой код работает, но я понял, что если задание искры потерпит неудачу, я не обязательно узнаю об этом из пользовательского интерфейса Airflow. Запуская задание в кластерном режиме, Airflow передает задание доступному работнику, поэтому airflow ничего не знает о задании Spark.

Как я могу решить эту проблему?


person sdot257    schedule 18.05.2017    source источник


Ответы (3)


Воздушный поток (начиная с версии 1.8) имеет

SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

Если вы используете их, задача воздушного потока завершится ошибкой, если не удастся выполнить задание искры. Возможно, вам придется изменить часть журнала в файле spark_submit_hook, если вы используете spark1.x для получения журналов в реальном времени, потому что spark-submit записывает даже ошибки в стандартный вывод для некоторых версий 1.x (мне пришлось внести изменения для 1.6. 1).

Также обратите внимание, что с момента последнего стабильного выпуска в SparkSubmitOperator было внесено множество улучшений.

person Him    schedule 14.06.2017

Вы можете рассмотреть возможность использования режима client, так как клиент не завершит работу, пока не завершится задание spark. Исполнитель воздушного потока может получить код выхода.

В противном случае вам может понадобиться использовать сервер заданий. Посетите https://github.com/spark-jobserver/spark-jobserver.

person Derek Chan    schedule 18.05.2017
comment
Мы рассмотрели это, но разве нам не нужно, чтобы наш блок воздушного потока был частью кластера Spark, если бы мы использовали режим client? Я все еще новичок в Spark, когда мы попробовали режим client, никакие задания не запускались, пока я не запустил рабочий Spark на указанном поле. - person sdot257; 18.05.2017
comment
Вам нужно будет запустить spark-submit на том же хосте, что и рабочий Airflow. Этот рабочий узел должен иметь возможность взаимодействовать с кластером Spark. - person Derek Chan; 18.05.2017

Вы можете начать использовать LivyOperator, который позаботится о мониторинге задания, LivyOperator будет опрашивать состояние вашего задания Spark с интервалом, который вы настроите для опроса. Пример:

kickoff_streamer_task = LivyOperator(
    task_id='kickoff_streamer_task',
    dag=dag,
    livy_conn_id='lokori',
    file='abfs://[email protected]/user/draxuser/drax_streamer.jar',
    **polling_interval=60**,  # used when you want to pull the status of submitted job
    queue='root.ids.draxuser',
    proxy_user='draxuser',
    args=['10', '3000'],
    num_executors=4,
    conf={
        'spark.shuffle.compress': 'false',
        'master': 'yarn',
        'deploy_mode': 'cluster',
        'spark.ui.view.acls': '*'
    },
    class_name='com.apple.core.drax.dpaas.batch.DraxMatrixProducer',
    on_success_callback=livy_callback,
    on_failure_callback=_failure_callback
)

В приведенном выше примере polling_interval установлен на 60 секунд, он будет продолжать опрашивать статус вашей работы через 60 секунд, он обязательно даст вам правильный статус вашей работы.

person rahul sharma    schedule 12.06.2021