Можно ли запустить Mono параллельно и агрегировать результат

Я знаю, что можно связать Моно, например ...

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());

Это будет цепочка, и resultBMono будет запущен, когда resultAMono вернется ....

Итак, мой вопрос: можно ли запустить 2 моно параллельно и когда оба возврата продолжатся с другим моно?

Думаю, это будет выглядеть примерно так ...

Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);

но я понятия не имею, что это будет работать в параллельном режиме или что я могу сделать, чтобы это работало параллельно ...

Спасибо за ответы ....


person posthumecaver    schedule 09.01.2018    source источник


Ответы (1)


2 семантики, 1 способ заставить их работать параллельно

Оба варианта, которые я представляю ниже, нуждаются в некоторой дополнительной настройке, чтобы заставить A и B Mono работать параллельно: а именно, каждый Mono должен использовать subscribeOn(Scheduler) для выхода из общего потока, из которого они объединены.

Если вас интересует только завершение A и B

Используйте when для прослушивания завершения A и B и then, чтобы продолжить с совершенно другим Mono:

Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .then(Mono.just("A and B finished, I don't know their value"));

Если вам важны значения A и B

Используйте zip + _9 _ / _ 10_ в зависимости от того, что вы хотите сделать с результатом.

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");

or

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));

then будет игнорировать предыдущие данные, поэтому использовать zip перед ним не имеет особого смысла.

Кроме того, zip приведет к пустому Mono, если один из A или B пуст! Используйте _17 _ / _ 18_ для защиты от этого случая.

person Simon Baslé    schedule 10.01.2018
comment
++ за подсказку subscribeOn(Scheduler)! - person Hartmut; 31.07.2018