Apache Flink - доступ к внутреннему буферу WindowedStream из MapFunction другого потока

У меня есть потоковое приложение на основе Apache Flink со следующей настройкой:

  • Источник данных: данные генерируются каждую минуту.
  • Оконный поток с использованием CountWindow с размером = 100, слайдом = 1 (скользящее окно подсчета).
  • ProcessWindowFunction для применения некоторых вычислений (скажем, F (x)) к данным в окне.
  • Приемник данных для потребления выходного потока

Это нормально работает. Теперь я хотел бы позволить пользователям предоставлять функцию G (x) и применять ее к текущим данным в окне и отправлять вывод пользователю в режиме реального времени.

Я не спрашиваю, как применить произвольную функцию G (x) - для этого я использую динамические сценарии. Я спрашиваю, как получить доступ к буферизованным данным в окне из функции карты другого потока.

Некоторый код для уточнения

DataStream<Foo> in  = .... // source data produced every minute
    in
       .keyBy(new MyKeySelector())
       .countWindow(100, 1)
       .process(new MyProcessFunction())
       .addSink(new MySinkFunction())

// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement

DataStream<Function> userRequest  = .... // request function from user

userRequest.map(new MapFunction<Function, FunctionResult>(){
   public FunctionResult map(Function Gx) throws Exception {
         Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
         FunctionResult result = Gx.apply(windowedDataFromAbove);
         return result;

   }

})


person Sebastian    schedule 21.04.2018    source источник


Ответы (2)


Соедините два потока, затем используйте CoProcessFunction. Вызов метода, который получает поток функций, может применить их к тому, что находится в окне вызова другого метода.

Если вы хотите транслировать функции, вам нужно либо использовать Flink 1.5 (который поддерживает соединение ключевых и широковещательных потоков), либо использовать некоторые вертолетные трюки для создания единого потока, который может содержать типы Foo и Function с соответствующей репликацией. функций (и ключевых поколений) для моделирования трансляции.

person kkrugler    schedule 22.04.2018
comment
Это была моя первая мысль. Но API не поддерживает соединение с WindowedStream ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/ - person Sebastian; 22.04.2018
comment
В (Co) ProcessFunction вы можете создавать собственные окна, контролируя срабатывание таймеров. - person kkrugler; 22.04.2018
comment
В любом случае реализация собственного управления окнами будет намного эффективнее - с помощью этих скользящих окон подсчета каждое событие копируется в 100 оконных объектов. - person David Anderson; 23.04.2018
comment
Это действительно так? Зачем Flink сделать 100 копий? Скользящее окно счетчика может быть реализовано с буфером фиксированного размера. - person Sebastian; 23.04.2018

Предполагая, что Fx агрегирует входящие foo на лету, а Gx обрабатывает количество foo для окна, вы должны быть в состоянии достичь того, что хотите, следующим образом:

DataStream<Function> userRequest  = .... // request function from user
Iterator<Function> iter = DataStreamUtils.collect(userRequest);
Function Gx = iter.next();

DataStream<Foo> in  = .... // source data
 .keyBy(new MyKeySelector())
 .countWindow(100, 1)
 .fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
 .addSink(new MySinkFunction())

Функция складывания (работает с входящими данными, как только они поступают) может быть определена следующим образом:

private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
    @Override
    public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
        acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
        acc.f1.add(foo);
        return acc;
    }
}

Функция процессора может быть примерно такой:

public class MyProcessorFunc
    extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {

    public MyProcessorFunc(Function Gx) {
        super();
        this.Gx = Gx;
    }

    @Override
    public void process(String key, Context context,
                        Iterable<Tuple2<Integer, List<foo>> accIt,
                        Collector<Tuple2<Integer, FunctionResult>> out) {
        Tuple2<Integer, List<foo> acc = accIt.iterator().next();
        out.collect(new Tuple2<Integer, FunctionResult>(
            acc.f0, // your Fx aggregation
            Gx.apply(acc.f1), // your Gx results
        ));
    }
}

Обратите внимание, что функции fold \ reduce по умолчанию не буферизуют элементы внутри. Здесь мы используем свертку для вычисления метрик на лету, а также для создания списка элементов окна.

Если вы заинтересованы в применении Gx к переворачивающимся окнам (а не к скольжению), вы можете использовать переворачивающиеся окна в своем конвейере. Для вычисления скользящих счетчиков тоже у вас может быть другая ветвь конвейера, которая вычисляет только скользящие счетчики (не применяет Gx). Таким образом, вам не нужно хранить 100 списков на окно.

Примечание: вам может потребоваться добавить следующую зависимость для использования DataStreamUtils:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-contrib</artifactId>
    <version>0.10.2</version>
</dependency>
person Nizar    schedule 25.04.2018
comment
Gx поступает от конечных пользователей в собственном DataStream. В приведенном выше примере кода Gx объявлен как входной параметр для конструктора MyProcessorFunc, но код вызывающего объекта не показывает, как Gx передается конструктору. - person Sebastian; 28.04.2018
comment
@Sebastian Предполагая, что у вас есть функция пользователя, всегда доступная в начале программы, вы можете преобразовать ее в итератор и предоставить в качестве входных данных для MyProcessorFunc, как показано в обновленном ответе. Вы также можете собирать вывод первого потока (списки) в памяти и ждать, пока не поступит запрос пользователя, но это не согласуется с тем, как работает Flink, поскольку вы блокируете вычисления первого потока (и сохраняете его вывод в памяти) до тех пор, пока не придет запрос пользователя! - person Nizar; 29.04.2018
comment
Функция от пользователя (Gx) недоступна в начале программы. Он отправляется конечными пользователями через REST API, который конвертируется в поток и отправляется во Flink для обработки. Однако есть встроенные функции (Fx), которые обрабатываются с оконными данными. Вопрос здесь, как использовать оконные данные, которые уже находятся в памяти (поскольку они используются для вычисления Fx), для вычисления Gx. - person Sebastian; 30.04.2018