Я выполняю задание потокового луча в кластере flink, где я получаю следующее исключение.
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:941)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:895)
at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:252)
at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:74)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:576)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)
at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:544)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Expect srcResourceIds and destResourceIds have the same scheme, but received alluxio, file.
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
at org.apache.beam.sdk.io.FileSystems.validateSrcDestLists(FileSystems.java:428)
at org.apache.beam.sdk.io.FileSystems.rename(FileSystems.java:308)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:755)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:850)
Задание потоковой передачи - это получение данных из источника apache pulsar и запись выходных данных в озеро данных Alluxio в формате файла parquet. Я использую scio Spotify для написания этой работы на Scala. Небольшой фрагмент кода, чтобы подчеркнуть, чего я пытаюсь достичь:
pulsarSource
.open(sc)
.withFixedWindows(Duration.standardSeconds(windowDuration))
.toSinkTap(sink)
Из исключения я вижу, что исходный и выходной пути должны иметь одинаковую схему URI, но я не знаю, как это происходит, потому что я использую путь alluxio в качестве выходного каталога. Есть несколько временных каталогов, которые создаются в выходном каталоге alluxio, но после WindowDuration
, когда создается выходной файл, возникает это исключение. У меня было сомнение, что временное местоположение может быть настроено по умолчанию для локальной файловой системы, поэтому я установил его для вывода пути к каталогу (alluxio dir path), но это ничего не изменило.
sc.options.setTempLocation(outputDir)
Я хочу выполнить удаленную отладку, чтобы разобраться в проблеме. Я пробовал этот документ для удаленного отладка на узле исполнителя задачи, но как только моя IntelliJ IDE подключается к узлу, я не попадаю в точку останова.
Может кто-нибудь подсказать, как я могу отладить или получить дополнительную информацию об этой проблеме. Спасибо