У меня есть функция, которая возвращает ввод как Mono:
public static Mono<Integer> emitter(int param){
return Mono.just(param)
.delayElement(Duration.ofMillis(100)); //delay to simulate http response
}
Я хотел бы вызвать эмиттер один раз с начальным значением 3, но затем повторить его, пока не будет достигнут определенный размер. Эта логика повторения должна быть в основном методе, поэтому я не могу изменить emitter()
.
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
Mono<Integer> response = emitter(initial);
response
.doOnNext(s -> {
System.out.println("need more!");
})
.subscribe();
}
Одно наивное решение:
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
for(int i = 0; i < 999; i++) {
Mono<Integer> response = emitter(initial+i);
Mono<Boolean> isDone = response
.flatMap(elem -> {
if(elem < maxSize) {
System.out.println("need more!");
return Mono.just(false);
} else {
System.out.println("ok done!");
return Mono.just(true);
}
});
if(isDone.block())
break;
}
}
По сути, я пытаюсь создать еще один Mono с динамическими параметрами на основе результата предыдущего Mono. Я знаю, что Mono / Flux неизменны ... Есть ли аккуратный и реактивный способ сделать это? Я пробовал такие вещи, как Flux.range(0, Integer.MAX_VALUE).zipWith(myMono)
, чтобы попытаться передать параметры в эмиттер, но не смог заставить его работать.
PS. Я знаю, что мой пример не имеет особого смысла. Я попытался упростить свой реальный сценарий, который включает списки и Spring WebFlux (эмиттер).
Спасибо!
---РЕДАКТИРОВАТЬ
Хорошо, вот что я придумал:
public static void main(String[] args) throws InterruptedException {
int maxSize = 5;
int initial = 3;
Flux.range(initial, 10)
.delayElements(Duration.ofSeconds(1))
.flatMap(param -> emitter(param))
.flatMap(it -> {
if(it < maxSize) {
System.out.println("need more!: " + it);
return Mono.just(false);
} else {
System.out.println("done!: " + it);
return Mono.just(true);
}
})
.takeUntil(Boolean::booleanValue)
.subscribe();
Thread.sleep(6000);
}
need more!: 3 need more!: 4 done!: 5
Одна из проблем заключается в том, что если я не откладываю Flux.range
, выполнение не будет выполняться по порядку, и возможно, что будет выведено больше или меньше операторов печати, чем ожидалось 3 строки.
flatMap
+takeUntil
однимtakeUntil
, верно? - person Simon Baslé   schedule 04.09.2018