У меня есть потоковое приложение на основе Apache Flink со следующей настройкой:
- Источник данных: данные генерируются каждую минуту.
- Оконный поток с использованием CountWindow с размером = 100, слайдом = 1 (скользящее окно подсчета).
- ProcessWindowFunction для применения некоторых вычислений (скажем, F (x)) к данным в окне.
- Приемник данных для потребления выходного потока
Это нормально работает. Теперь я хотел бы позволить пользователям предоставлять функцию G (x) и применять ее к текущим данным в окне и отправлять вывод пользователю в режиме реального времени.
Я не спрашиваю, как применить произвольную функцию G (x) - для этого я использую динамические сценарии. Я спрашиваю, как получить доступ к буферизованным данным в окне из функции карты другого потока.
Некоторый код для уточнения
DataStream<Foo> in = .... // source data produced every minute
in
.keyBy(new MyKeySelector())
.countWindow(100, 1)
.process(new MyProcessFunction())
.addSink(new MySinkFunction())
// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement
DataStream<Function> userRequest = .... // request function from user
userRequest.map(new MapFunction<Function, FunctionResult>(){
public FunctionResult map(Function Gx) throws Exception {
Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
FunctionResult result = Gx.apply(windowedDataFromAbove);
return result;
}
})