Использование методов Mono с динамическими параметрами

У меня есть функция, которая возвращает ввод как Mono:

public static Mono<Integer> emitter(int param){
    return Mono.just(param)
            .delayElement(Duration.ofMillis(100)); //delay to simulate http response
}

Я хотел бы вызвать эмиттер один раз с начальным значением 3, но затем повторить его, пока не будет достигнут определенный размер. Эта логика повторения должна быть в основном методе, поэтому я не могу изменить emitter().

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;
    Mono<Integer> response = emitter(initial);

    response
        .doOnNext(s -> {
            System.out.println("need more!");
        })
        .subscribe();
}

Одно наивное решение:

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;

    for(int i = 0; i < 999; i++) {
        Mono<Integer> response = emitter(initial+i);
        Mono<Boolean> isDone = response
                .flatMap(elem -> {
                    if(elem < maxSize) {
                        System.out.println("need more!");
                        return Mono.just(false);
                    } else {
                        System.out.println("ok done!");
                        return Mono.just(true);
                    }
                });
        if(isDone.block())
            break;
    }
}

По сути, я пытаюсь создать еще один Mono с динамическими параметрами на основе результата предыдущего Mono. Я знаю, что Mono / Flux неизменны ... Есть ли аккуратный и реактивный способ сделать это? Я пробовал такие вещи, как Flux.range(0, Integer.MAX_VALUE).zipWith(myMono), чтобы попытаться передать параметры в эмиттер, но не смог заставить его работать.

PS. Я знаю, что мой пример не имеет особого смысла. Я попытался упростить свой реальный сценарий, который включает списки и Spring WebFlux (эмиттер).

Спасибо!

---РЕДАКТИРОВАТЬ

Хорошо, вот что я придумал:

public static void main(String[] args) throws InterruptedException {
    int maxSize = 5;
    int initial = 3;

    Flux.range(initial, 10)
            .delayElements(Duration.ofSeconds(1))  
            .flatMap(param -> emitter(param))
            .flatMap(it -> {
                if(it < maxSize) {
                    System.out.println("need more!: " + it);
                    return Mono.just(false);
                } else {
                    System.out.println("done!: " + it);
                    return Mono.just(true);
                }
             })
            .takeUntil(Boolean::booleanValue)
            .subscribe();

    Thread.sleep(6000);
}
need more!: 3
need more!: 4
done!: 5

Одна из проблем заключается в том, что если я не откладываю Flux.range, выполнение не будет выполняться по порядку, и возможно, что будет выведено больше или меньше операторов печати, чем ожидалось 3 строки.


person GbGbGbGbEB    schedule 03.09.2018    source источник
comment
у вас должна быть возможность заменить этот второй flatMap + takeUntil одним takeUntil, верно?   -  person Simon Baslé    schedule 04.09.2018
comment
Ага, тоже работает!   -  person GbGbGbGbEB    schedule 04.09.2018


Ответы (1)


Вы можете использовать функцию expand в Publisher, которая действует как рекурсия, например.

emitter(initial)
    .expand(i -> i < maxSize ? emitter(i + 1) : Mono.empty())
    .doOnNext(i -> System.out.println("i = " + i))
    .subscribe();
person Kevin Hussey    schedule 04.09.2018