есть ли способ заставить Flux, сгенерированный groupBy (), завершиться через определенный период времени (или аналогичным образом ограничить максимальное количество «открытых» групп) независимо от полноты восходящего потока? У меня примерно следующее:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
и я сталкиваюсь с ситуацией, когда Flux зависает, поскольку предполагается, что количество групп больше, чем параллелизм flatMap
. Я мог бы увеличить flatMap
параллелизм, но нет простого способа определить максимально возможный размер. Вместо этого я знаю, что Foo
, сгруппированные по Foo.key
, будут близки друг к другу по времени / порядку публикации, и предпочел бы использовать какое-то временное окно в groupBy Flux по сравнению с параллелизмом flatMap (и в конечном итоге с двумя разными группами w / тот же key()
не имеет большого значения).
Я предполагаю, что groupBy
Flux не будет onComplete, пока someFastPubisher
onCompletes - т.е. поток, переданный flatMap
, просто останется «открытым» (хотя они вряд ли когда-либо получат новое событие).
Я могу обойти это либо путем предварительной выборки Integer.MAX
в groupBy, либо Integer.MAX
одновременного выполнения - но есть ли способ контролировать «жизнь» группы?