Мой вопрос очень похож на Как поддерживать несколько KeyBy во Flink, просто этот вопрос касается Java, мне нужен ответ на Scala. Я скопировал и вставил предоставленное решение в IntelliJ, он автоматически преобразовал скопированный фрагмент в Scala, который я затем отредактировал, чтобы он соответствовал моему коду. Я все еще получаю ошибки компиляции (даже до компиляции IntelliJ может обнаружить проблему с кодом). В основном аргумент, предоставленный keyBy (возвращаемое значение функции keySelector getKey), не соответствует аргументам, ожидаемым любой перегруженной версией функции keyBy.
Я просмотрел множество примеров кода Scala для KeySelector, который возвращает составной ключ, но не нашел ни одного.
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new
KeySelector[AAPerMinData, Tuple2[String, String]]() {
@throws[Exception]
override def getKey(value: AAPerMinData): Tuple2[String, String] =
Tuple2.of(value.field1, value.field2)
})
При компиляции кода я получаю следующую ошибку:
Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence $2:org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields:
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {
Я не уверен, что мне не хватает в синтаксисе, вызывающем эту ошибку. Любая помощь приветствуется. Следующий шаг, как только это будет решено, - выполнить суммирование на основе TumblingWindow на основе составного ключа.
Обновление 1 (29.12.2018): изменил код, чтобы использовать простое поле типа String в качестве ключа с использованием формата KeySelector (я понимаю, что это можно сделать гораздо проще, я сделать это таким образом, чтобы получить базовую работу KeySelector).
import org.apache.flink.api.java.functions.KeySelector
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
@throws[Exception]
override def getKey(value: AAPerMinData): String = value.set1.sEntId
})
Вот снимок экрана с ошибкой, которую я получаю (т.е. IntelliJ показывает это при наведении курсора мыши).
Обновление 2 (29.12.2018)
Это работает (для случая с одним ключом)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String]
(_.set1.sEntId)
Это не работает (для случая составного ключа)
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)
Обновление 3 (29.12.2018) Пробовал следующее, не смог заставить работать. См. Снимок экрана с ошибкой.
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))
Обновление 4 (30.12.2018) Сейчас решено, см. принятый ответ. Для всех, кто может быть заинтересован, это окончательный рабочий код, включающий использование составного ключа для агрегирования:
// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
out: Collector[AAPerMinDataAggr]) =>
out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
key._1, key._2, // also needed individual pieces
window,
events,
stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()