Ошибка при использовании DataflowPythonOperator воздушного потока для планирования задания потока данных

Я пытаюсь запланировать задания потока данных с помощью воздушного потока DataflowPythonOperator. Вот мой оператор dag:

test = DataFlowPythonOperator(
    task_id = 'my_task',
    py_file = 'path/my_pyfile.py',
        "project": 'my_project',
        "runner": "DataflowRunner",
        "job_name": 'my_job',
        "staging_location": 'gs://my/staging', 
        "temp_location": 'gs://my/temping',
        "requirements_file": 'path/requirements.txt'

Gcp_conn_id настроен и может работать. И ошибка показала, что поток данных завершился неудачно с кодом возврата 1. Полный журнал приведен ниже.

[2018-07-05 18:24:39,928] {gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2018-07-05 18:24:40,049] {base_task_runner.py:95} INFO - Subtask: 
[2018-07-05 18:24:40,049] {models.py:1433} ERROR - DataFlow failed with return code 1
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1390, in run
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: result = task_copy.execute(context=context)
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 182, in execute
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: self.py_file, self.py_options)
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_python_dataflow
[2018-07-05 18:24:40,050] {base_task_runner.py:95} INFO - Subtask: task_id, variables, dataflow, name, ["python"] + py_options)
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: _Dataflow(cmd).wait_for_done()
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: self._proc.returncode))
[2018-07-05 18:24:40,051] {base_task_runner.py:95} INFO - Subtask: Exception: DataFlow failed with return code 1

Кажется, есть проблема с gcp_dataflow_hook.py, за исключением этого, дополнительной информации нет. Есть ли способ исправить это и есть ли какой-нибудь пример DataflowPythonOperator? (Пока я не нашел ни одного варианта использования)

Ответы (2)

Я не получаю то же сообщение об ошибке, но думаю, что это может помочь. Средство выполнения потока данных python, похоже, завершается странным образом, что не влияет на отдельные задания потока данных, но не может быть правильно обработано классом воздушного потока Python DataFlowPythonOperator. Я отправляю заявку, но вот способ решения моей проблемы. ВАЖНЫЙ! Патч необходимо применить к заданию Dataflow, а не к заданию воздушного потока.

Вверху задания Dataflow добавьте следующие операции импорта

import threading
import time
import types   
from apache_beam.runners.runner import PipelineState

Затем над кодом Dataflow добавьте следующее. Это в основном вырезано и вставлено из основного класса ~ dataflow.dataflow_runner с прокомментированными изменениями.

def local_poll_for_job_completion(runner, result, duration):
    """Polls for the specified job to finish running (successfully or not).
    Updates the result with the new job information before returning.
      runner: DataflowRunner instance to use for polling job state.
      result: DataflowPipelineResult instance used for job information.
      duration (int): The time to wait (in milliseconds) for job to finish.
        If it is set to :data:`None`, it will wait indefinitely until the job
        is finished.
    last_message_time = None
    current_seen_messages = set()

    last_error_rank = float('-inf')
    last_error_msg = None
    last_job_state = None
    # How long to wait after pipeline failure for the error
    # message to show up giving the reason for the failure.
    # It typically takes about 30 seconds.
    final_countdown_timer_secs = 50.0
    sleep_secs = 5.0

    # Try to prioritize the user-level traceback, if any.
    def rank_error(msg):
        if 'work item was attempted' in msg:
            return -1
        elif 'Traceback' in msg:
            return 1
        return 0

    if duration:
        start_secs = time.time()
        duration_secs = duration // 1000

    job_id = result.job_id()
    keep_checking = True  ### Changed here!!!
    while keep_checking:  ### Changed here!!!
        response = runner.dataflow_client.get_job(job_id)
        # If get() is called very soon after Create() the response may not contain
        # an initialized 'currentState' field.
        logging.info("Current state: " + str(response.currentState))
        # Stop looking if the job is not terminating normally
        if str(response.currentState) in (  ### Changed here!!!
                'JOB_STATE_DONE',  ### Changed here!!!
                'JOB_STATE_CANCELLED',  ### Changed here!!!
                # 'JOB_STATE_UPDATED',
                'JOB_STATE_DRAINED',  ### Changed here!!!
                'JOB_STATE_FAILED'):  ### Changed here!!!
            keep_checking = False  ### Changed here!!!
        if response.currentState is not None:
            if response.currentState != last_job_state:
                logging.info('Job %s is in state %s', job_id, response.currentState)
                last_job_state = response.currentState
            if str(response.currentState) != 'JOB_STATE_RUNNING':
                # Stop checking for new messages on timeout, explanatory
                # message received, success, or a terminal job state caused
                # by the user that therefore doesn't require explanation.
                if (final_countdown_timer_secs <= 0.0
                        or last_error_msg is not None
                        or str(response.currentState) == 'JOB_STATE_UPDATED'):  ### Changed here!!!
                    keep_checking = False  ### Changed here!!!

                # Check that job is in a post-preparation state before starting the
                # final countdown.
                if (str(response.currentState) not in (
                        'JOB_STATE_PENDING', 'JOB_STATE_QUEUED')):
                    # The job has failed; ensure we see any final error messages.
                    sleep_secs = 1.0      # poll faster during the final countdown
                    final_countdown_timer_secs -= sleep_secs


        # Get all messages since beginning of the job run or since last message.
        page_token = None
        while True:
            messages, page_token = runner.dataflow_client.list_messages(
                job_id, page_token=page_token, start_time=last_message_time)
            for m in messages:
                message = '%s: %s: %s' % (m.time, m.messageImportance, m.messageText)

                if not last_message_time or m.time > last_message_time:
                    last_message_time = m.time
                    current_seen_messages = set()

                if message in current_seen_messages:
                    # Skip the message if it has already been seen at the current
                    # time. This could be the case since the list_messages API is
                    # queried starting at last_message_time.
                # Skip empty messages.
                if m.messageImportance is None:
                if str(m.messageImportance) == 'JOB_MESSAGE_ERROR':
                    if rank_error(m.messageText) >= last_error_rank:
                        last_error_rank = rank_error(m.messageText)
                        last_error_msg = m.messageText
            if not page_token:

        if duration:
            passed_secs = time.time() - start_secs
            if passed_secs > duration_secs:
                logging.warning('Timing out on waiting for job %s after %d seconds',
                                job_id, passed_secs)

    result._job = response
    runner.last_error_msg = last_error_msg

def local_is_in_terminal_state(self):
    logging.info("Current Dataflow job state: " + str(self.state))
    logging.info("Current has_job: " + str(self.has_job))
    if self.state in ('DONE', 'CANCELLED', 'DRAINED', 'FAILED'):
        return True
        return False

class DataflowRuntimeException(Exception):
    """Indicates an error has occurred in running this pipeline."""

    def __init__(self, msg, result):
        super(DataflowRuntimeException, self).__init__(msg)
        self.result = result

def local_wait_until_finish(self, duration=None):
    logging.info("!!!!!!!!!!!!!!!!You are in a Monkey Patch!!!!!!!!!!!!!!!!")
    if not local_is_in_terminal_state(self):  ### Changed here!!!
        if not self.has_job:
            raise IOError('Failed to get the Dataflow job id.')

        # DataflowRunner.poll_for_job_completion(self._runner, self, duration)
        thread = threading.Thread(
            target=local_poll_for_job_completion,  ### Changed here!!!
            args=(self._runner, self, duration))

        # Mark the thread as a daemon thread so a keyboard interrupt on the main
        # thread will terminate everything. This is also the reason we will not
        # use thread.join() to wait for the polling thread.
        thread.daemon = True
        while thread.isAlive():

        terminated = local_is_in_terminal_state(self)  ### Changed here!!!
        logging.info("Terminated state: " + str(terminated))
        # logging.info("duration: " + str(duration))
        # assert duration or terminated, (  ### Changed here!!!
        #     'Job did not reach to a terminal state after waiting indefinitely.')  ### Changed here!!!

        assert terminated, "Timed out after duration: " + str(duration)  ### Changed here!!!

    else:  ### Changed here!!!
        assert False, "local_wait_till_finish failed at the start"  ### Changed here!!!

    if self.state != PipelineState.DONE:
        # TODO(BEAM-1290): Consider converting this to an error log based on
        # theresolution of the issue.
        raise DataflowRuntimeException(
            'Dataflow pipeline failed. State: %s, Error:\n%s' %
            (self.state, getattr(self._runner, 'last_error_msg', None)), self)

    return self.state

Затем, когда вы запускаете конвейер, используйте соглашение (а не версию 'with beam.Pipeline (options = pipeline_options) p:')

p = beam.Pipeline(options=pipeline_options)

Наконец, когда ваш конвейер будет построен, используйте следующие

result = p.run()
# Monkey patch to better handle termination
result.wait_until_finish = types.MethodType(local_wait_until_finish, result)

Примечание. Это исправление не решит проблему, если вы используете сервер воздушного потока v1.9, как это было с файлом исправления 1.10. Функция файла патча для _Dataflow.wait_for_done не возвращает job_id, и ему тоже нужно. Патч для патча хуже чем выше. Обновите, если можете. Если вы не можете вставить следующий код в качестве заголовков в свой скрипт Dag с последней версией файла, он должен работать. airflow / contrib / hooks / gcp_api_base_hook.py, airflow / contrib / hooks / gcp_dataflow_hook.py и airflow / contrib / operators / dataflow_operator.py

я добавил

import threading
import time
import types   
from apache_beam.runners.runner import PipelineState

изменил предложение with на p = beam.Pipeline(options=pipeline_options), и я закончил его с result.wait_until_finish() Моя DAG успешно

