Согласно документации, withLatestFrom
отсутствует в реализации Java (что немного отличается от combineLatest
). Любая идея о том, как подражать этому?
RxJava: как эмулировать с помощью LatestFrom?
Ответы (2)
Учитывая a
в качестве основного наблюдаемого и b
в качестве наблюдаемого "последнего из", эта лямбда псевдо-java8 должна делать то, что вы хотите:
a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y)))
Это сначала публикует a
как a'
, что позволяет многократно подписываться на него без перезапуска потока. Затем каждый раз, когда создается новый элемент из b
, он повторно подписывается на текущий поток a
, который объединяет последний вывод b
с каждым выводом a
.
Вы можете легко обернуть это как реализацию Transformer
RxJava, вот так (также полупсевдо, так что проверьте мой синтаксис):
public class WithLatestFrom<T, U, V> implements Transformer<T, V> {
private final Func2<T, U, V> function;
private final Observable<U> latest;
private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) {
this.function = function;
this.latest = latest;
}
public static <T, U, V> WithLatestFrom<T, U, V> with(
final Observable<U> latest, Func2<T, U, V> function) {
return new WithLatestFrom<T, U, V>(latest, function);
}
@Override
public Observable<V> call(final Observable<T> source) {
return source.publish((publishedSource) -> latest.switchMap((y) ->
publishedSource.map((x) -> function.call(x, y)));
}
}
Затем вы можете повторно использовать его в своем коде, например:
a.compose(WithLatestFrom.with(b, (x, y) -> x + y));
Некоторая очень простая и наивная реализация:
@SuppressWarnings("unchecked")
public static <T, U, V> Observable<T> combineLatestFrom(
Observable<U> o1,
Observable<V> o2,
Func2<U, V, T> f) {
final Object nothing = new Object();
return Observable.create(s -> {
AtomicReference<V> val2 = new AtomicReference<V>((V) nothing);
o1.subscribe(v -> {
val2.getAndUpdate(current -> {
if (current != nothing) {
s.onNext(f.call(v, current));
}
return current;
});
}, s::onError, s::onCompleted);
o2.subscribe(val2::set, s::onError);
});
}
И этот метод можно использовать так:
combineLatestFrom(numbers, letters, (n, l) -> n + l)
.subscribe(System.out::println);
Если здесь числа и буквы являются наблюдаемыми из мраморной диаграммы -> результат будет таким, как ожидалось.
val1
? Ваш пример можно упростить, поскольку val1
кажется бесполезным?
- person Renaud Cerrato; 19.02.2015