Ключ потока данных Flink с помощью составного ключа

Мой вопрос очень похож на Как поддерживать несколько KeyBy во Flink, просто этот вопрос касается Java, мне нужен ответ на Scala. Я скопировал и вставил предоставленное решение в IntelliJ, он автоматически преобразовал скопированный фрагмент в Scala, который я затем отредактировал, чтобы он соответствовал моему коду. Я все еще получаю ошибки компиляции (даже до компиляции IntelliJ может обнаружить проблему с кодом). В основном аргумент, предоставленный keyBy (возвращаемое значение функции keySelector getKey), не соответствует аргументам, ожидаемым любой перегруженной версией функции keyBy.

Я просмотрел множество примеров кода Scala для KeySelector, который возвращает составной ключ, но не нашел ни одного.

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new 
    KeySelector[AAPerMinData, Tuple2[String, String]]() {
    @throws[Exception]
    override def getKey(value: AAPerMinData): Tuple2[String, String] = 
    Tuple2.of(value.field1, value.field2)  
})

При компиляции кода я получаю следующую ошибку:

Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence $2:org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields: 
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {

Я не уверен, что мне не хватает в синтаксисе, вызывающем эту ошибку. Любая помощь приветствуется. Следующий шаг, как только это будет решено, - выполнить суммирование на основе TumblingWindow на основе составного ключа.

Обновление 1 (29.12.2018): изменил код, чтобы использовать простое поле типа String в качестве ключа с использованием формата KeySelector (я понимаю, что это можно сделать гораздо проще, я сделать это таким образом, чтобы получить базовую работу KeySelector).

  import org.apache.flink.api.java.functions.KeySelector
  import org.myorg.aarna.AAPerMinData
  val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
    @throws[Exception]
    override def getKey(value: AAPerMinData): String = value.set1.sEntId
  })

Вот снимок экрана с ошибкой, которую я получаю (т.е. IntelliJ показывает это при наведении курсора мыши). введите здесь описание изображения

Обновление 2 (29.12.2018)

Это работает (для случая с одним ключом)

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String] 
(_.set1.sEntId)

Это не работает (для случая составного ключа)

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)

Обновление 3 (29.12.2018) Пробовал следующее, не смог заставить работать. См. Снимок экрана с ошибкой.

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))

введите здесь описание изображения

Обновление 4 (30.12.2018) Сейчас решено, см. принятый ответ. Для всех, кто может быть заинтересован, это окончательный рабочий код, включающий использование составного ключа для агрегирования:

// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))

// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))

// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
                                                 out: Collector[AAPerMinDataAggr]) =>
      out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
        key._1, key._2, // also needed individual pieces
        window,
        events,
        stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()

person Amit Arora    schedule 29.12.2018    source источник


Ответы (1)


Прежде всего, хотя в этом нет необходимости, используйте кортежи Scala. В целом это упростит задачу, если только вам по какой-то причине не придется взаимодействовать с кортежами Java.

И тогда не используйте org.apache.flink.api.java.functions.KeySelector. Вы хотите использовать этот ключ От org.apache.flink.streaming.api.scala.DataStream:

/**
 * Groups the elements of a DataStream by the given K key to
 * be used with grouped operators like grouped reduce or grouped aggregations.
 */
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {

  val cleanFun = clean(fun)
  val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]

  val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
    def getKey(in: T) = cleanFun(in)
    override def getProducedType: TypeInformation[K] = keyType
  }
  asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}

Другими словами, просто передайте функцию, которая преобразует элементы вашего потока в ключевые значения (в общем, Scala API Flink пытается быть идиоматическим). Итак, что-то вроде этого должно сработать:

aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)

Обновлять:

Для случая составного ключа используйте

aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
person David Anderson    schedule 29.12.2018
comment
Этот конкретный импорт уже присутствовал в начале файла (извините, я не разместил объявления импорта t в начале файла). Чтобы сделать код еще более простым, теперь я пытаюсь использовать KeySelector для несоставного ключа (одно поле типа String), но все равно получаю ту же ошибку. - person Amit Arora; 29.12.2018
comment
См. Обновление 1 выше, я опубликовал упрощенный код и снимок экрана с ошибкой. - person Amit Arora; 29.12.2018
comment
Кстати, вы можете найти больше примеров scala в репозитории тренировочных упражнений Flink на github: github.com/dataArtisans/flink-training-exercises/tree/master/ - person David Anderson; 29.12.2018
comment
Спасибо, я полагаю, вы имели в виду это: val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy [String] (.set1.sEntId), как мне заставить его работать для двух полей (составной ключевой случай). Пытался сделать это, но он не компилируется: val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy [(String, String)] ( .set1.sEntId, _.set1.field2) - person Amit Arora; 29.12.2018
comment
По-прежнему чего-то не хватает, не может заставить его работать. См. Снимок экрана в обновлении 3. - person Amit Arora; 30.12.2018
comment
Большое спасибо за помощь в решении этой проблемы, очень признателен. - person Amit Arora; 30.12.2018
comment
Привет, ты можешь мне помочь с этим вопросом? stackoverflow.com/questions/61922101/ - person Danieledu; 21.05.2020