Апач Флинк. Окна с водяными знаками

Я пытаюсь агрегировать 60-секундные данные с указанием минутной метки времени с максимальной задержкой 30 секунд.

DataStream<OHLChelp> ohlcAggStream = stockStream.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(30))).map(new mapStockToOhlcHelp()).keyBy((KeySelector<OHLChelp, Long>) o -> o.getMinTime())
            .timeWindow(Time.seconds(60))
            .reduce(new aggregateOHLC());
//map complex object to simpler one
DataStream<OHLCmodel> ohlcStremAggregated = ohlcAggStream.map(new mapOHLCredToOHLCfin());
//log ohlc stream
ohlcStreamAggregated.writeAsText(outLogPath);

Я получаю данные. Устанавливаются водяные знаки и временные метки. Кажется, агрегированные данные никогда не отправляются в ohlcStreamAggregated и поэтому не регистрируются.

public TimestampExtractor(Time maxDelayInterval) {
        if (maxDelayInterval.toMilliseconds() < 0) {
            throw new RuntimeException("This parameter must be positive or 0.);
        }
        this.maxDelayInterval = maxDelayInterval.toMilliseconds() / 1000;
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxDelayInterval;
    }

@Override
public final Watermark getCurrentWatermark() {
        // set maximum delay 30 seconds
        long potentialWM = currentMaxTimestamp - maxDelayInterval;
        if (potentialWM > lastEmittedWM) {
            lastEmittedWM = potentialWM;
        }
        return new Watermark(lastEmittedWM);
    }
@Override
public final long extractTimestamp(StockTrade stockTrade, long previousElementTimestamp) {
        BigDecimal bd = new BigDecimal(stockTrade.getTime());
        long timestamp = bd.longValue();
        //set the maximum seen timestamp so far
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    } 

Я использовал этот пример в качестве шаблона.


person la_femme_it    schedule 17.07.2017    source источник


Ответы (1)


Было бы проще диагностировать ваше приложение, если бы вы могли поделиться им целиком (возможно, в общих чертах), но вы:

  • установите временную характеристику на время события (документы)?
  • вызов выполнения в среде выполнения потока?

Кроме того, ваш экстрактор временных меток может быть немного проще. Что-то вроде этого:

public static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<StockTrade> {
    public TimestampExtractor() {
        super(Time.seconds(30));
    }

    @Override
    public long extractTimestamp(StockTrade trade) {
        return trade.getTime();
    }
}
person David Anderson    schedule 20.07.2017
comment
Да, я установил env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); и env.execute(Торги из темы Kafka); Поскольку первый поток сбора работает нормально, логирование и т. д., я думаю, что проблема в работе с окнами. Если бы вы могли отправить мне свой адрес электронной почты, мы могли бы обсудить это дальше. Весьма признателен. - person la_femme_it; 25.07.2017