Количество выполнений flatMap зависит от наблюдателя, в то время как требуется одиночное

У меня есть механизм push-уведомлений (PublishSubject), который запускает логику HTTP-запроса (flatMap). Базовый сценарий заключается в том, что всякий раз, когда приходит push, выполняется один HTTP-вызов, и результаты распространяются на несколько наблюдателей.

Я написал простую демонстрацию для этого случая, но flatMap выполняется для каждого зарегистрированного наблюдателя, в то время как я хотел бы, чтобы он запускался только один раз при каждом нажатии.

PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
    // this code runs for each observer, which is twice in this case
    return Observable.just(String.valueOf(integer));
});


Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);

subject.onNext(1);

Не могли бы вы предложить исправление?

Спасибо

P.S. Прямо сейчас я использую cache(1) для решения проблемы, но я не уверен, что это нормально. Более того, я не совсем понимаю, почему один поток выполнения будет зависеть от количества подключенных наблюдателей. Вы можете это прокомментировать?


person midnight    schedule 06.07.2015    source источник


Ответы (1)


Вы уже используете его, вам нужно либо publish() + connect(), либо publish().refCount(), если вы хотите сделать значение доступным для нескольких наблюдаемых. Первый случай позволяет вам контролировать, когда на самом деле сделать Observable популярным, в то время как второй заработает, как только вы подпишетесь в первый раз. RxJS также имеет share, который обертывает publish().refCount(), не уверен, что RxJava также имеет это.

PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
    // this code runs for each observer, which is twice in this case
    return Observable.just(String.valueOf(integer));
}).publish().refCount();


Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);

subject.onNext(1);
person paulpdaniels    schedule 06.07.2015
comment
Да, в RxJava тоже есть .share() - person Dave Moten; 06.07.2015