Не удалось восстановить состояния контрольных точек с помощью Flink Processor API.

Основная программа потребляет события кафки, затем фильтрует - ›карта -› keyBy - ›CEP -› сток. Я написал еще одну отдельную простую программу для чтения каталога контрольных точек, например:

object StateReader extends App {

  val path = "file://...."

  val env = ExecutionEnvironment.getExecutionEnvironment

  val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))

  val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
  println(ds.count())

}

class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
  override def open(parameters: Configuration): Unit = {

  }
  override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {

  }//end readKey
}//end class CepOperatorReadFunction

Однако я получил следующее исключение:

Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
    at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
    ... 13 more

Вот некоторые конфигурации в flink-conf.yaml

state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0

Есть идеи, почему произошло исключение и как решить проблему?

Спасибо


person Grant    schedule 27.10.2020    source источник
comment
Нет никакой гарантии, что API обработчика состояний может считывать контрольные точки, и есть большая вероятность, что он не работает с инкрементными контрольными точками. Можете ли вы вместо этого попробовать использовать точку сохранения?   -  person David Anderson    schedule 27.10.2020
comment
В основной программе я установил uid следующим образом: CEP.pattern(trafficStream.javaStream, pattern, new EventComparator[VALUE]{...}.process(new PatternProcessFunction[VALUE, String]{...}.uid("cep").name("cep") . когда я попробовал chk.readKeyedState("cep",....), я получил Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.   -  person Grant    schedule 27.10.2020


Ответы (1)


Нет готовой поддержки, чтобы облегчить чтение состояния оператора CEP. Итак, чтобы реализовать свой KeyedStateReaderFunction, вам придется покопаться в реализации CEP, найти используемые ValueState и MapState и реализовать считыватель, который использует те же дескрипторы состояния.

person David Anderson    schedule 27.10.2020