Очередь независимых наблюдаемых

Можно ли поставить в очередь несколько наблюдаемых, чтобы они выполнялись друг за другом, не зная друг друга?

Допустим, у меня есть 3 разных Observables в 3 разных классах и/или потоках. Все они представляют разные запросы. Порядок их выполнения зависит от того, у какого наблюдаемого есть первый подписчик (чего мы не знаем)

Я хочу, чтобы они выполнялись друг за другом в какой-то очереди, и если какой-то 4-й наблюдаемый присоединяется к очереди, это должно быть добавлено в очередь и выполнено после 3-го.

это возможно с некоторыми инструментами ootb rx?


person andre    schedule 30.04.2015    source источник
comment
Возможно, я не совсем понимаю ваш вопрос, но не могли бы вы объяснить, почему один из различных комбинированных методов (например, Observable.merge() или Observable.combineLatest()) не соответствует вашим потребностям?   -  person dcsohl    schedule 30.04.2015


Ответы (2)


Насколько я понял из вашего вопроса, вы хотите построить Observable из Observable. Затем выполните Observable по порядку, используя concatMap

Observable<Observable<Result>> tasks = ... //build your Observable of Observable

tasks.concatMap(obs -> obs)
     .subscribe(resultOfNestedObservable -> /** getting result of Observable **/);

Чтобы проверить эту идею, попробуйте использовать Subject.

 Subject<Observable<Observable<Result>> source = PublishSubject.create(); 
 Observable<Observable<Result>> tasks = source;

 /*...*/
 source.onNext(observable1);
 source.onNext(observable2);
person dwursteisen    schedule 01.05.2015

Если я правильно понимаю проблему, у вас есть независимые наблюдаемые и независимые подписчики, но вы все равно хотите убедиться, что только один наблюдаемый «выполняется». Это может быть достигнуто с помощью пользовательского планировщика:

ExecutorService exec = Executors.newFixedThreadPool(1, r -> {
    Thread t = new Thread(r);
    t.setDaemon(true);
    return t;
});
Scheduler singleThread = Schedulers.from(exec);

Observable o1 = someObservable.subscribeOn(singleThread);
Observable o2 = someObservable.subscribeOn(singleThread);
Observable o3 = someObservable.subscribeOn(singleThread);

o2.subscribe();
o1.subscribe();
o3.subscribe();

У ExecutorService есть своя очередь, и кто бы ни подписался первым, остальные останутся в этой очереди, пока не завершится предыдущая.

person akarnokd    schedule 04.05.2015
comment
Попробую, звучит убедительно... Буду держать вас в курсе - person andre; 04.05.2015
comment
Там опечатка... Исполнители должны быть Исполнителями :) ExecutorService exec = Executors.newFixedThreadPool(1, r -> { Thread t = new Thread(r); t.setDaemon(true); return t; }); - person isuPatches; 03.11.2015