RxJava 2.0 - обработка ресурсов для неперехваченной ошибки подписки в publish (). RefCount ()

Я новичок в RxJava и, как и многие другие, пытаюсь разобраться в обработке исключений. Я прочитал довольно много сообщений в Интернете (например, это обсуждение здесь как обрабатывать исключения, созданные наблюдателем onNext) и думаю, что я получил основное представление о концепциях.

В вышеупомянутом обсуждении на одном из плакатов говорится, что когда в подписчике генерируется исключение, RxJava выполняет следующие действия:

Внедрите универсальную обработку для регистрации сбоя и прекращения отправки ему событий (любого типа), а также для очистки любых ресурсов, связанных с этим подписчиком, и продолжите работу с любыми оставшимися подписками.

Это также более или менее то, что я вижу, единственное, с чем у меня проблемы, - это бит «очистить все ресурсы». Чтобы прояснить это, давайте рассмотрим следующий пример:

Я хочу создать Observable, который прослушивает источник асинхронных событий (например, очередь JMS) и onNext () для каждого полученного сообщения. Итак, в (псевдо) коде я бы сделал что-то подобное:

Observable<String> observable = Observable.create( s -> {
  createConnectionToBroker();
  getConsumer().setMessageListener(message -> s.onNext(transform(message)));
  s.setDisposable(new Disposable() {
    public void dispose() {
      tearDownBrokerConnection();
    }
  });
});

Поскольку я хочу повторно использовать прослушиватель сообщений для многих подписчиков / наблюдателей, я не подписываюсь напрямую на созданный Observable, а вместо этого использую команду publish (). RefCount (). Что-то похожее на это:

Observable<String> observableToSubscribeTo = observable.publish().refCount();

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);

Все работает так, как ожидалось. Код подключается к JMS только при установке первой подписки, а соединение с брокером закрывается, когда последним наблюдателем был dispose()d.

Однако, когда подписчик выдает исключение, когда он onNext()ed, все становится беспорядочно. Как и ожидалось, бросивший наблюдатель получает ядерное оружие, и всякий раз, когда публикуется новое событие, он больше не получает уведомления. Моя проблема заключается в том, что, когда все оставшиеся подписчики dispose()d, Observable, поддерживающий соединение с брокером сообщений, больше не уведомляется. Мне кажется, что подписчик, вызвавший исключение, находится в каком-то зомби-состоянии. Когда дело доходит до распространения событий, он игнорируется, но каким-то образом предотвращает получение корневого Observable уведомления, когда последний подписчик - dispose()d.

Я понимаю, что RxJava ожидает, что наблюдатели не выбрасывают, а правильно обрабатывают возможное исключение. К сожалению, в случае, когда я хочу предоставить библиотеку, которая возвращает вызывающему объекту Observable, у меня нет никакого контроля над своими подписчиками. Это означает, что я никогда не смогу защитить свою библиотеку от глупых наблюдателей.

Итак, я спрашиваю себя: я что-то упустил? Неужели нет шанса нормально убрать при выкидывании подписчика? Это ошибка или я просто не разбираюсь в библиотеке?

Любые идеи очень ценятся!


person Oliver Dotzauer    schedule 06.02.2017    source источник


Ответы (1)


Если бы вы могли показать несколько модульных тестов, демонстрирующих проблему (без использования JMS), это было бы здорово.

Кроме того, onNext в RxJava 2 никогда не должен бросать; если это так, то это неопределенное поведение. Если вы не доверяете своим потребителям, у вас может быть трансформатор, наблюдаемый за концом, который выполняет safeSubscribe вместо простого subscribe, который добавляет защиту от неправильного поведения нижестоящего:

.compose(o -> v -> o.safeSubscribe(v))

or

.compose(new ObservableTransformer<T>() {
    @Override public Observable<T> apply(final Observable<T> source) {
        return new Observable<T>() {
            @Override public void subscribeActual(Observer<? super T> observer) {
                 source.safeSubscribe(observer);
            }
        };
    }
})
person akarnokd    schedule 10.02.2017
comment
привет @akarnokd, спасибо, что разобрались с этим! Я просто написал базовый модульный тест, чтобы продемонстрировать поведение, просто чтобы узнать, что я больше не могу его воспроизвести?!?! : -o Довольно неловко, я должен сказать :-( Так что я могу просто предположить, что проблема была где-то в моем клеевом коде. Извиняюсь за потраченное время !!! И спасибо за комментарии относительно safeSubscribe (). Поскольку как библиотека вы никогда нельзя доверять используемому коду, я бы очень хотел его укрепить.Один плохо себя ведет подписчик не должен иметь никаких побочных эффектов на других хорошо себя ведет. - person Oliver Dotzauer; 12.02.2017