RxJava Асинхронное устранение дребезга

Можно ли, используя операции RxJava, «игнорировать» элементы, которые были обработаны в нисходящем потоке, если восходящий поток создает новые элементы?

Например

Observable.create(...)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  etc...

что мне нужно добиться, так это то, что если восходящий поток уведомит о новом элементе, он отменит любую операцию в потоке ниже, который в настоящее время выполняется, даже если нисходящий поток делает что-то в другом асинхронном потоке.


person Ido Kahana    schedule 26.04.2018    source источник


Ответы (1)


Вместо оператора flatMap() можно использовать switchMap(). Когда новое значение получено в switchMap(), старая наблюдаемая отменяется и подписывается заменяющая наблюдаемая.

Observable.create( ... )
  .switchMap( value -> getObservable1( value )
                         .switchMap( value2 -> getObservable2( value2 ) )
  ...

Если вы хотите, чтобы наблюдатели, расположенные ниже по течению, также были отменены, вам придется распространить switchMap(). В приведенном выше коде эмиссия на первом этапе цепочки наблюдателя будет отписываться от getObservable1( value ) и от getObservable2( value2 ).

person Bob Dalgleish    schedule 26.04.2018