Как записать результат каждого скользящего окна программы FLINK в новый файл Вместо добавления результата всех окон в один файл

Ниже приведена программа flink (Java), которая считывает твиты из файла, извлекает хэш-теги, подсчитывает количество повторений для каждого хэш-тега и, наконец, записывает в файл.

Теперь в этой программе есть скользящее окно размером 20 секунд, которое скользит на 5 секунд. В приемнике все выходные данные записываются в файл с именем outfile. Означает, что каждые 5 секунд одно окно запускается и записывает данные в файл outfile.

Моя проблема:

Я хочу, чтобы при каждом запуске окна (то есть каждые 5 секунд) данные записывались в новый файл. (вместо добавления в тот же файл). Подскажите где и как это можно сделать? Нужно ли использовать настраиваемый триггер или какую-либо конфигурацию приемника? или что-нибудь еще?

Код:

<!-- language: lang-java -->

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.getConfig().setAutoWatermarkInterval(100);

env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

String path = "C:\\Users\\eventTime";
// Reading data from files of folder eventTime.
DataStream<String> streamSource = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).uid("read-1");

//Extracting the hash tags of tweets
DataStream<Tuple3<String, Integer, Long>> mapStream = streamSource.map(new ExtractHashTagFunction());   

//generating watermarks and extracting the timestamps from tweets
DataStream<Tuple3<String, Integer, Long>> withTimestampsAndWatermarks = mapStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

KeyedStream<Tuple3<String, Integer, Long>,Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);

//Using sliding window of 20 seconds which slide by 5 seconds.
SingleOutputStreamOperator<Tuple4<String, Integer, Long, String>> aggregatedStream = keyedStream.**window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5)))**
        .aggregate(new AggregateHashTagCountFunction()).uid("agg-123");                 

aggregatedStream.writeAsText("C:\\Users\\outfile", WriteMode.NO_OVERWRITE).setParallelism(1).uid("write-1");

env.execute("twitter-analytics");

person Gaurav    schedule 12.03.2018    source источник


Ответы (1)


Если вас не устраивают встроенные раковины, вы можете определить собственную раковину:

stream.addSink(new MyCustomSink ...)

MyCustomSink должен реализовать SinkFunction

Ваш пользовательский приемник будет содержать FileWriter и, например. счетчик. Каждый раз, когда приемник вызывается, он будет писать в "/path/to/file + counter.yourFileExtension"

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.html< /а>

person Alex    schedule 12.03.2018
comment
Кроме того, было бы довольно легко сделать так, чтобы этот пользовательский приемник (например, PerRecordSink) обернул реальный приемник (S3 и т. д.), чтобы вы получили эти реализации бесплатно. - person kkrugler; 12.03.2018
comment
@Alex, на самом деле, когда я пишу собственный приемник и помещаю /path/to/file + counter.yourFileExtension в функцию invoke() пользовательского приемника, тогда для каждой записи создается новый файл. Но мое требование - создавать новый файл для каждого запуска окна. Означает, что в одном окне может быть несколько записей, и я хочу, чтобы все записи были в одном файле. Надеюсь, ты это понял. Пожалуйста, предложите. - person Gaurav; 13.03.2018
comment
@Gaurav да, приемник вызывается каждый раз, когда окно создает запись. Таким образом, это вопрос того, сколько записей выдает ваше окно. Поэтому вы должны переделать свою функцию так, чтобы она испускала, например. единый массив записей в конце, который сразу будет записан в файл. - person Alex; 13.03.2018
comment
@Alex, я написал пользовательскую функцию process() для окна. Эта функция процесса берет все элементы из Iterable, добавляет их в локальную строку и, наконец, добавляет эту строку в сборщик функции процесса. И в пользовательской функции раковины(). Я создаю новый файл в функции invoke(), используя counter. Результат: Теперь снова создается новый файл для каждой записи вместо целых данных окна. Это означает, что когда данные достигают Sink, Sink не знает границы данных каждого окна. Если хотите, могу поделиться кодом. - person Gaurav; 13.03.2018
comment
@Gaurav, если вы можете предоставить minimal пример, чтобы воспроизвести проблему, вы можете это сделать. - person Alex; 13.03.2018
comment
@Alex, Алекс, спасибо, на самом деле я пробовал много вещей и наконец нашел решение. На самом деле, я использовал поток с ключами, поэтому мой исходный поток разбивался на несколько потоков с логическими ключами (= количество ключей на окно), и для каждого потока с логическими ключами вызывалась функция вызова функции приемника. Итак, я использовал функцию windowAll() Flink и объединил все потоки с ключами в один поток и, следовательно, перешел к приемнику. Это сработало. Но да, мне пришлось использовать пользовательскую функцию приемника, как вы предложили. Спасибо еще раз. - person Gaurav; 14.03.2018
comment
@Gaurav рад, что смог помочь. Вы также можете принять ответ и проголосовать. - person Alex; 14.03.2018
comment
@Alex, я проголосовал за ваш ответ, но ему нужны и другие вещи, как я объяснил выше, функция windowAll () и т. д. Таким образом, это будет вводящим в заблуждение, если я приму ответ на этом этапе. Надеюсь, вы понимаете. Но большое спасибо, Алекс. Возможно, в будущем мы сможем больше обсудить FLINK. - person Gaurav; 14.03.2018
comment
@Алексей, хорошо понял. И если вы нашли вопрос логичным, вы тоже можете проголосовать за него. - person Gaurav; 14.03.2018