RxJava: как эмулировать с помощью LatestFrom?

Согласно документации, withLatestFrom отсутствует в реализации Java (что немного отличается от combineLatest). Любая идея о том, как подражать этому? введите здесь описание изображения


person Renaud Cerrato    schedule 18.02.2015    source источник
comment
Вероятно, дубликат stackoverflow.com/questions/27203435/   -  person Tomáš Dvořák    schedule 19.02.2015


Ответы (2)


Учитывая a в качестве основного наблюдаемого и b в качестве наблюдаемого "последнего из", эта лямбда псевдо-java8 должна делать то, что вы хотите:

a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))

Это сначала публикует a как a', что позволяет многократно подписываться на него без перезапуска потока. Затем каждый раз, когда создается новый элемент из b, он повторно подписывается на текущий поток a, который объединяет последний вывод b с каждым выводом a.

Вы можете легко обернуть это как реализацию Transformer RxJava, вот так (также полупсевдо, так что проверьте мой синтаксис):

 public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
   private final Func2<T, U, V> function;
   private final Observable<U> latest;

   private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
     this.function = function;
     this.latest = latest;
   }

   public static <T, U, V> WithLatestFrom<T, U, V> with(
       final Observable<U> latest, Func2<T, U, V> function) {
     return new WithLatestFrom<T, U, V>(latest, function);
   }

   @Override
   public Observable<V> call(final Observable<T> source) {
     return source.publish((publishedSource) -> latest.switchMap((y) ->
         publishedSource.map((x) -> function.call(x, y)));
   }

}

Затем вы можете повторно использовать его в своем коде, например:

a.compose(WithLatestFrom.with(b, (x, y) -> x + y));
person lopar    schedule 20.02.2015
comment
Кроме того, я только что узнал, что в 1.0.7 есть экспериментальный оператор withLatestFrom: github. com/ReactiveX/RxJava/releases/tag/v1.0.7 :) - person lopar; 22.02.2015
comment
Да, интересное решение в любом случае! Спасибо! - person Renaud Cerrato; 26.02.2015

Некоторая очень простая и наивная реализация:

@SuppressWarnings("unchecked")
public static <T, U, V> Observable<T> combineLatestFrom(
  Observable<U> o1,
  Observable<V> o2,
  Func2<U, V, T> f) {

    final Object nothing = new Object();
    return Observable.create(s -> {

        AtomicReference<V> val2 = new AtomicReference<V>((V) nothing);

        o1.subscribe(v -> {
          val2.getAndUpdate(current -> {
            if (current != nothing) {
              s.onNext(f.call(v, current));
            }
            return current;
          });   
        }, s::onError, s::onCompleted);

        o2.subscribe(val2::set, s::onError);

    });
}

И этот метод можно использовать так:

combineLatestFrom(numbers, letters, (n, l) -> n + l)
  .subscribe(System.out::println);

Если здесь числа и буквы являются наблюдаемыми из мраморной диаграммы -> результат будет таким, как ожидалось.

person meddle    schedule 19.02.2015
comment
Какова цель val1? Ваш пример можно упростить, поскольку val1 кажется бесполезным? - person Renaud Cerrato; 19.02.2015