Reactor 3.x - ограничить время группы By Flux

есть ли способ заставить Flux, сгенерированный groupBy (), завершиться через определенный период времени (или аналогичным образом ограничить максимальное количество «открытых» групп) независимо от полноты восходящего потока? У меня примерно следующее:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;

и я сталкиваюсь с ситуацией, когда Flux зависает, поскольку предполагается, что количество групп больше, чем параллелизм flatMap. Я мог бы увеличить flatMap параллелизм, но нет простого способа определить максимально возможный размер. Вместо этого я знаю, что Foo, сгруппированные по Foo.key, будут близки друг к другу по времени / порядку публикации, и предпочел бы использовать какое-то временное окно в groupBy Flux по сравнению с параллелизмом flatMap (и в конечном итоге с двумя разными группами w / тот же key() не имеет большого значения).

Я предполагаю, что groupBy Flux не будет onComplete, пока someFastPubisher onCompletes - т.е. поток, переданный flatMap, просто останется «открытым» (хотя они вряд ли когда-либо получат новое событие).

Я могу обойти это либо путем предварительной выборки Integer.MAX в groupBy, либо Integer.MAX одновременного выполнения - но есть ли способ контролировать «жизнь» группы?


person jamey graham    schedule 19.12.2017    source источник


Ответы (1)


да: вы можете применить take(Duration) к группам, чтобы гарантировать, что они закроются раньше, и после этого откроется новая группа с тем же ключом:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();
person Simon Baslé    schedule 20.12.2017
comment
я доверяю, это работает, учитывая, кто ответил на это :) но мне трудно представить себе, что происходит. take() вызывает ли groupedBy Flux завершение после продолжительности? И / или как это передается вверх по течению в GroupedFlux? - person jamey graham; 03.01.2018
comment
неважно - думаю, я понял. Дело не в том, что передается сигнал о группе upstream groupBy - дело в том, что flatMap видит take() Flux (копию), а не GroupedFlux Flux. - person jamey graham; 03.01.2018
comment
Я бы предпочел groupBy метод, который принимает аргумент Duration в качестве синтаксического сахара. Без намека на такой подход сложно прийти. - person Oleg Kurbatov; 28.06.2018