RxJava2: 2 отдельных наблюдаемых вывода и объединенный вывод одних и тех же наблюдаемых различаются

Snippet1 , я вижу sysout от обоих подписчиков.
Snippet2 , я не вижу вывода от второго наблюдаемого.
Почему у меня не работает слияние?

Фрагмент1

x = createQ2Flowable().subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io())
        .filter(predicate -> !predicate.toString().contains("<log realm=\"\""))
        .subscribe(onNext -> System.out.println("Q2->" + onNext));

y = createMetricsFlowable().subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io())
        .subscribe(onNext -> System.out.println("metrics->" + onNext));  

Фрагмент 2

createQ2Flowable().mergeWith(createMetricsFlowable())
.subscribeOn(Schedulers.computation())
.subscribe(onNext -> System.out.println(onNext));  

[править]: добавлены потоковые создатели

private Flowable<String> createMetricsFlowable() {
    return Flowable.create(source -> {
        Space sp = SpaceFactory.getSpace("rxObservableFeeder");
        while (running()) {
            String line = (String) sp.in("RXTmFeeder");
            source.onNext(line);
        }

    }, BackpressureStrategy.BUFFER);

}

private Flowable<String> createQ2Flowable() {
    return Flowable.create(source -> {
        Space sp = SpaceFactory.getSpace("LoggerSpace");
        while (running()) {
            LogEvent line = (LogEvent) sp.in("rxLoggingKey");
            source.onNext(line.toString());

        }

    }, BackpressureStrategy.BUFFER);

}

person chhil    schedule 10.01.2018    source источник
comment
Предоставьте реализацию этих createX() Flowables. Также попробуйте createQ2Flowable().subscribeOn(Schedulers.computation()).mergeWith(createMetricsFlowable().subscribeOn(Schedulers.computation())) в Snippet2.   -  person akarnokd    schedule 10.01.2018
comment
Попробовал вашу рекомендацию, сработало. Теперь мне нужно знать, почему это произошло. Спасибо.   -  person chhil    schedule 10.01.2018
comment
Я также получаю неполные sysouts, когда добавляю наблюдение непосредственно перед подпиской.   -  person chhil    schedule 10.01.2018


Ответы (1)


Из комментариев:

пытаться

createQ2Flowable()
.subscribeOn(Schedulers.computation())      // <-------------------------
.mer‌​geWith(createMetrics‌​Flowable()
    .subscribe‌​On(Schedulers.comput‌​ation())  // <-------------------------
)

Теперь мне нужно знать, почему это произошло

Учитывая подробную реализацию, у вас есть два синхронных Flowable. Когда вы их объединяете, первый Flowable подписывается и начинает излучать немедленно и никогда не возвращает управление mergeWith, поэтому второй Flowable никогда не подписывается.

subscribeOn после mergeWith не эквивалентно приведенному выше решению. Вы должны явно подписать оба Flowable на фоновый поток, чтобы mergeWith мог подписаться на второй Flowable после того, как синхронный цикл был перемещен из потока, который mergeWith использует для подписки на свои источники.

person akarnokd    schedule 10.01.2018
comment
Perfect , каждый поток со своим собственным subscribeOn имеет смысл. Проблема наблюдателя перед подпиской также была решена с этим. Я также неправильно поместил второй subscribeOn после слияния, а не в createMetrics‌​Flowable(), ошибка брекетинга. Спасибо за помощь в решении этой проблемы. - person chhil; 10.01.2018