RxJava Асинхронно отстраняване на отскок

Използвайки операции на RxJava, възможно ли е да "игнорирате" елементи, които са били обработени в низходящия поток, ако възходящият поток излъчва нови елементи?

например

Observable.create(...)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  .flatMap(...) // 30 seconds to process (Asynchronous)
  etc...

това, което трябва да постигна, е, че уведомяването нагоре за новия елемент ще отмени всяка операция в потока отдолу, която се изпълнява в момента, дори ако потокът надолу прави нещо в друга нишка асинхронно.


person Ido Kahana    schedule 26.04.2018    source източник


Отговори (1)


Вместо оператор flatMap() можете да използвате switchMap(). Когато се получи нова стойност в switchMap(), старата наблюдаема се отписва и заместващата наблюдаема се абонира.

Observable.create( ... )
  .switchMap( value -> getObservable1( value )
                         .switchMap( value2 -> getObservable2( value2 ) )
  ...

Ако искате наблюдателите по-надолу по веригата също да бъдат отменени, ще трябва да разпространите switchMap(). В кода по-горе, емисия в първия етап от веригата на наблюдателя ще се отпише от getObservable1( value ) и от getObservable2( value2 ).

person Bob Dalgleish    schedule 26.04.2018