Событие Flink испускает, когда более раннее совпадающее событие в паре не найдено

У меня есть два потока событий: один генерирует событие, сигнализирующее о начале жизни элемента, и другой поток, который генерирует событие, сигнализирующее об окончании срока жизни элемента. (Потоки могут быть объединены на itemId.)

Как я могу генерировать новое событие в Flink для каждого itemId1, только имеющего событие "конец жизненного цикла", а не соответствующее начало? (Эти начальные и конечные события могут быть разделены на часы или дни.)


person epb    schedule 03.12.2016    source источник
comment
Гарантировано ли в вашем потоке, что событие начала жизни всегда предшествует событию конца жизни? Так что, если вы видели событие eol, события bol больше не будет.   -  person Robert Metzger    schedule 05.12.2016
comment
Да, можно предположить, что события всегда располагаются правильно   -  person epb    schedule 05.12.2016


Ответы (1)


Функциональность можно реализовать с помощью FlatMapFunction с отслеживанием состояния на KeyedStream.

Следующий фрагмент кода должен делать в значительной степени то, что вы ищете.

val stream1: DataStream[Event1] = ???
val stream2: DataStream[Event2] = ???

// map both streams to their ID and a isStart flag to have a common type
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true) )
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false) )

// union both streams
val ids = ids1.union(ids2)

// use a stateful FlatMapFunction to check 
val onlyEOL: DataStream[Int] = ids
  // organize stream by ID
  .keyBy(_._1)
  // use stateful FlatMapFunction to check that bol arrived before eol
  .flatMapWithState { 
    (value: (Int, Boolean), state: Option[Boolean]) =>
      if (value._2) {
        // bol event -> emit nothing and set state to true
        ( List(), Some(true))
      } else {
        // eol event
        if (state.isDefined && state.get) {
          // bol was seen before -> emit nothing and remove state
          ( List(), None) 
        } else {
          // bol was NOT seen before -> emit ID and remove state
          ( List(value._1), None)   
        }
      }
  }   
person Fabian Hueske    schedule 07.12.2016