Apache Beam - Уточнение ожидаемого поведения подсказки типа вывода в Python SDK

Я пытаюсь понять внутренний SDK Apache Beam Python и в настоящее время читаю часть проверки типов. Я написал очень простой конвейер, как показано ниже:

class AddZeroFn(beam.DoFn):
  def process(self, element):
    return [element + '0']

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args, pipeline_type_check=True)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(options=pipeline_options) as p:
    numbers = p | beam.Create(['1', '2', '3'])
    numbers = numbers | beam.ParDo(AddZeroFn())
    numbers | 'Write' >> WriteToText('result.txt')

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Тогда результат

10
20
30

Хорошо, затем я добавил проверку типа для ввода, например

numbers = numbers | beam.ParDo(AddZeroFn().with_input_types(str))

Это нормально, и возникает ошибка, если я изменяю str на int, как ожидалось.

apache_beam.typehints.decorators.TypeCheckError:
Type hint violation for 'ParDo(AddZeroFn)':
requires <type 'int'> but got <type 'str'> for element

Однако, когда я добавил проверку типа вывода, например

numbers = numbers | beam.ParDo(AddZeroFn().with_output_types(float))

Он просто работал без каких-либо проблем. Ошибка не возникает, хотя я думал, что увижу ту же ошибку, что и подсказка ввода. Я неправильно понимаю использование подсказки типа вывода? Если да, могу я спросить, как with_output_type должен себя вести?

Также ptransform.type_check_inputs_or_outputs имеет строки, как показано ниже

  if pvalue_.element_type is None:
    # TODO(robertwb): It's a bug that we ever get here. (typecheck)
    continue
  if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
    at_context = ' %s %s' % (input_or_output, context) if context else ''
    raise TypeCheckError(
        '%s type hint violation at %s%s: expected %s, got %s' % (
            input_or_output.title(), self.label, at_context, hint,
            pvalue_.element_type))

Однако, если я устанавливаю какой-либо оператор печати в первом блоке if, я вижу, что во многих случаях программа входит в блок, что означает пропуск проверки типа. Я был бы признателен, если бы кто-нибудь мог помочь мне понять, каково текущее правильное поведение в отношении typehint.

Версия Apache Beam - 2.2.0. (Я также тестировал с 2.3.0dev0)

Добавлено (27.12.2017): Я тестировал с помощью DirectRunner, но перешел на DataflowRunner и теперь вижу следующую ошибку. Это то, что мы ожидаем увидеть, когда устанавливаем with_output_types? Когда я устанавливаю with_input_types(int), он терпит неудачу перед отправкой задания в Dataflow, поэтому я подумал, что то же самое произойдет и с типами вывода.

(7b12756b863da949): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
    def start(self):
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
    with self.spec.source.reader() as reader:
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn, None, original_traceback
  File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process
    self.output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    self._value_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 99, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
  File "apache_beam/coders/coder_impl.py", line 442, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
    return get_varint_size(value)
  File "apache_beam/coders/stream.pyx", line 222, in apache_beam.coders.stream.get_varint_size
    cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
TypeError: an integer is required [while running 'ParDo(AddZeroFn)']

person Norio Akagi    schedule 26.12.2017    source источник
comment
Хм, определенно кажется подозрительным. Вы выполняли задание потока данных? У вас есть идентификатор задания потока данных для одного из сбойных заданий?   -  person Lara Schmidt    schedule 27.12.2017
comment
Изучаю это, но это может быть немного медленным из-за праздников. :)   -  person Lara Schmidt    schedule 27.12.2017
comment
@LaraSchmidt Спасибо за ответ. Я тестировал с помощью DirectRunner, но переключился на DataflowRunner и увидел другое поведение. В DirectRunner нет ошибки, когда я устанавливаю with_output_types, но с DataflowRunner возникает ошибка времени выполнения, а with_input_types вызывает ошибку перед отправкой задания в Dataflow (как я и ожидал). Идентификатор вакансии: 2017-12-27_17_08_34-1329704750450101019, но также проверьте мой комментарий, добавленный к вопросу.   -  person Norio Akagi    schedule 28.12.2017


Ответы (1)


Указанные типы вывода используются только для обеспечения согласованности с последующими преобразованиями. Например, если вы написали

numbers2 = numbers | beam.ParDo(AddZeroFn().with_output_types(float)) 
numbers2 | beam.ParDo(...).with_input_types(str)

вы получите ошибку.

person robertwb    schedule 10.01.2018