Случайный сбой в приложении с отслеживанием состояния KafkaStreams

Привет, вот проблема, с которой я сталкиваюсь несколько дней и не могу найти ответ самостоятельно.

Я использую scala streams API v2.0.0.

У меня есть два входящих потока, разветвленных на два обработчика для разделения, и оба объявляют Transformer с использованием общего StateStore.

Чтобы сделать краткий обзор, это выглядит так

def buildStream(builder: StreamsBuilder, config: Config) = {
    val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
    builder.addStateStore(store)

    val handlers = List(handler1, handler2)

    builder
      .stream(config.topic)
      .branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
      .zip(handlers.toList)                       // (KStream[K, V], Handler)
      .map((h, stream) => h.handle(stream))       // process the event on the correct handler
      .reduce((s1, s2) => s1.merge(s2))           // merge them back as they return the same object
      .to(config.output)

    builder
}

Все мои обработчики выглядят одинаково: принимают событие, выполняют какие-то операции, проходят через метод transform() для получения состояния и выдают агрегат:

class Handler1(config: Config) {
    def accepts(key: String, value: Event): Boolean = ???  // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
        stream
          .(join/map/filter)
          .transform(new Transformer1(config.storeName))
    }
}


class Handler2(config: Config) {
    def accepts(key: String, value: Event): Boolean = ???  // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
        stream
          .(join/map/filter)
          .transform(new Transformer2(config.storeName))
    }
}

Преобразователи используют один и тот же StateStore со следующей логикой: для нового события проверить, существует ли его агрегат, если да, обновить его + сохранить его + сгенерировать новый агрегат, в противном случае построить агрегат + сохранить его + сгенерировать.

class Transformer1(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
        store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
        val existing: Option[AggregatedState] = Option(store.get(key))
        val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

        store.put(key, agg)
        if(agg.isTerminal){
          store.delete(key)
        }
        if(isDuplicate(existing, agg)){
            null                              // Tombstone, we have a duplicate
        } else{
            (key, agg)                        // Emit the new aggregate
        }
    }

    override def close() = Unit
}


class Transformer2(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
        store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
        val existing: Option[AggregatedState] = Option(store.get(key))
        val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

        store.put(key, agg)
        if(agg.isTerminal){
          store.delete(key)
        }
        if(isDuplicate(existing, agg)){
            null                              // Tombstone, we have a duplicate
        } else{
            (key, agg)                        // Emit the new aggregate
        }
    }

    override def close() = Unit
}

Transformer2 тот же, меняется только бизнес-логика (как слить новое событие с агрегированным состоянием)

У меня проблема в том, что при запуске потока у меня может быть либо нормальный запуск, либо загрузочное исключение:

15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)

Я уже искал и получил результаты с «Трансформатор использует заводской шаблон», который используется здесь (поскольку .transform берет трансформатор и создает TransformerSupplier под капотом). Поскольку ошибка является псевдослучайной (я мог бы воссоздать ее несколько раз), я предполагаю, что это может быть состояние гонки при запуске, но я не нашел ничего окончательного. Это потому, что я использую одно и то же хранилище состояний на разных трансформаторах?


person whisust    schedule 22.11.2018    source источник


Ответы (1)


Я предполагаю, что вы нажимаете https://issues.apache.org/jira/browse/KAFKA-7250

Исправлено в версиях 2.0.1 и 2.1.0.

Если вы не можете обновиться, вам нужно явно передать TransformerSupplier, потому что Scale API неправильно конструирует поставщика в 2.0.0.

.transform(() => new Transformer1(config.storeName))
person Matthias J. Sax    schedule 23.11.2018
comment
Действительно, это проблема (и решение). Обновление до 2.1.0 решило мою проблему. Благодарю вас! - person whisust; 23.11.2018