Я новичок в RxJava и, как и многие другие, пытаюсь разобраться в обработке исключений. Я прочитал довольно много сообщений в Интернете (например, это обсуждение здесь как обрабатывать исключения, созданные наблюдателем onNext) и думаю, что я получил основное представление о концепциях.
В вышеупомянутом обсуждении на одном из плакатов говорится, что когда в подписчике генерируется исключение, RxJava выполняет следующие действия:
Внедрите универсальную обработку для регистрации сбоя и прекращения отправки ему событий (любого типа), а также для очистки любых ресурсов, связанных с этим подписчиком, и продолжите работу с любыми оставшимися подписками.
Это также более или менее то, что я вижу, единственное, с чем у меня проблемы, - это бит «очистить все ресурсы». Чтобы прояснить это, давайте рассмотрим следующий пример:
Я хочу создать Observable, который прослушивает источник асинхронных событий (например, очередь JMS) и onNext () для каждого полученного сообщения. Итак, в (псевдо) коде я бы сделал что-то подобное:
Observable<String> observable = Observable.create( s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
Поскольку я хочу повторно использовать прослушиватель сообщений для многих подписчиков / наблюдателей, я не подписываюсь напрямую на созданный Observable, а вместо этого использую команду publish (). RefCount (). Что-то похожее на это:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
Все работает так, как ожидалось. Код подключается к JMS только при установке первой подписки, а соединение с брокером закрывается, когда последним наблюдателем был dispose()
d.
Однако, когда подписчик выдает исключение, когда он onNext()
ed, все становится беспорядочно. Как и ожидалось, бросивший наблюдатель получает ядерное оружие, и всякий раз, когда публикуется новое событие, он больше не получает уведомления. Моя проблема заключается в том, что, когда все оставшиеся подписчики dispose()
d, Observable, поддерживающий соединение с брокером сообщений, больше не уведомляется. Мне кажется, что подписчик, вызвавший исключение, находится в каком-то зомби-состоянии. Когда дело доходит до распространения событий, он игнорируется, но каким-то образом предотвращает получение корневого Observable уведомления, когда последний подписчик - dispose()
d.
Я понимаю, что RxJava ожидает, что наблюдатели не выбрасывают, а правильно обрабатывают возможное исключение. К сожалению, в случае, когда я хочу предоставить библиотеку, которая возвращает вызывающему объекту Observable, у меня нет никакого контроля над своими подписчиками. Это означает, что я никогда не смогу защитить свою библиотеку от глупых наблюдателей.
Итак, я спрашиваю себя: я что-то упустил? Неужели нет шанса нормально убрать при выкидывании подписчика? Это ошибка или я просто не разбираюсь в библиотеке?
Любые идеи очень ценятся!