Удаленная отладка задания Apache Beam на кластере flink

Я выполняю задание потокового луча в кластере flink, где я получаю следующее исключение.

Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
        at org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:428)
        at org.apache.beam.sdk.io.FileSystems.rename(FileSystems.java:308)
        at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:755)
        at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:850)

Задание потоковой передачи - это получение данных из источника apache pulsar и запись выходных данных в озеро данных Alluxio в формате файла parquet. Я использую scio Spotify для написания этой работы на Scala. Небольшой фрагмент кода, чтобы подчеркнуть, чего я пытаюсь достичь:

    pulsarSource
      .open(sc)
      .withFixedWindows(Duration.standardSeconds(windowDuration))
      .toSinkTap(sink)

Из исключения я вижу, что исходный и выходной пути должны иметь одинаковую схему URI, но я не знаю, как это происходит, потому что я использую путь alluxio в качестве выходного каталога. Есть несколько временных каталогов, которые создаются в выходном каталоге alluxio, но после WindowDuration, когда создается выходной файл, возникает это исключение. У меня было сомнение, что временное местоположение может быть настроено по умолчанию для локальной файловой системы, поэтому я установил его для вывода пути к каталогу (alluxio dir path), но это ничего не изменило.

sc.options.setTempLocation(outputDir)

Я хочу выполнить удаленную отладку, чтобы разобраться в проблеме. Я пробовал этот документ для удаленного отладка на узле исполнителя задачи, но как только моя IntelliJ IDE подключается к узлу, я не попадаю в точку останова.

Может кто-нибудь подсказать, как я могу отладить или получить дополнительную информацию об этой проблеме. Спасибо


person Zahid Adeel    schedule 31.08.2020    source источник


Ответы (1)


Удаленная отладка может быть довольно сложной, но давайте сначала попробуем следующее: убедитесь, что вы подключаетесь к диспетчеру задач, а не к диспетчеру заданий (легко проверить с помощью имен потоков). Затем убедитесь, что у вас было много повторных попыток, чтобы вы не пропустили выполнение задачи, поскольку присоединение отладки может занять некоторое время.

Также полезно дважды проверить, соответствуют ли номера строк трассировки стека вашей версии кода в среде IDE. Если Flink / Beam предустановлен, они могут запускать немного другую версию, и ваша точка останова недействительна. Просто вставьте трассировку стека в свою среду IDE и проверьте, соответствует ли каждая строка ожидаемому. Наконец, добавьте еще несколько точек останова в центральных местах, таких как org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202), чтобы убедиться, что установка вообще работает.

Однако удаленная отладка обычно не рекомендуется для систем с большими данными. Сначала вы должны убедиться на местном уровне, что большая часть вещей работает сама по себе с помощью некоторых ИТ-тестов и локальных исполнителей. Затем вы можете добавить тесты e2e с контейнерами докеров и локальным мини-кластером. Кроме того, вы должны добавить множество операторов ведения журнала, которые можно включать и выключать с помощью конфигурации ведения журнала. Точно так же, если вы установите уровень ведения журнала для отладки, существующих операторов журнала фреймворков может быть уже достаточно для получения некоторой информации. Одна важная вещь, на которую вы всегда должны смотреть, - это сгенерированная топология, которую вы можете видеть в веб-интерфейсе. Может быть, он уже подскажет вам пути, о которых идет речь.

person Arvid Heise    schedule 02.09.2020