Ниже приведена программа flink (Java), которая считывает твиты из файла, извлекает хэш-теги, подсчитывает количество повторений для каждого хэш-тега и, наконец, записывает в файл.
Теперь в этой программе есть скользящее окно размером 20 секунд, которое скользит на 5 секунд. В приемнике все выходные данные записываются в файл с именем outfile. Означает, что каждые 5 секунд одно окно запускается и записывает данные в файл outfile.
Моя проблема:
Я хочу, чтобы при каждом запуске окна (то есть каждые 5 секунд) данные записывались в новый файл. (вместо добавления в тот же файл). Подскажите где и как это можно сделать? Нужно ли использовать настраиваемый триггер или какую-либо конфигурацию приемника? или что-нибудь еще?
Код:
<!-- language: lang-java -->
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
String path = "C:\\Users\\eventTime";
// Reading data from files of folder eventTime.
DataStream<String> streamSource = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).uid("read-1");
//Extracting the hash tags of tweets
DataStream<Tuple3<String, Integer, Long>> mapStream = streamSource.map(new ExtractHashTagFunction());
//generating watermarks and extracting the timestamps from tweets
DataStream<Tuple3<String, Integer, Long>> withTimestampsAndWatermarks = mapStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
KeyedStream<Tuple3<String, Integer, Long>,Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);
//Using sliding window of 20 seconds which slide by 5 seconds.
SingleOutputStreamOperator<Tuple4<String, Integer, Long, String>> aggregatedStream = keyedStream.**window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5)))**
.aggregate(new AggregateHashTagCountFunction()).uid("agg-123");
aggregatedStream.writeAsText("C:\\Users\\outfile", WriteMode.NO_OVERWRITE).setParallelism(1).uid("write-1");
env.execute("twitter-analytics");