RxJava отказывается от одного наблюдателя от другого

Предполагая следующий сценарий, что было бы лучшим/рекомендуемым решением проблемы?

У меня есть два потока, один из которых представляет TCP-соединение, а другой представляет состояние этого TCP-соединения. Как только статус изменится (т.е. отключен), я хотел бы повторно установить TCP-соединение.

Моя первоначальная идея состояла в том, чтобы иметь эти 2 потока, объединить их и применить retryWith к результирующему Observable. Второй поток — это экземпляр PublishSubject, что дает мне очень удобный способ отказа. Теперь эта идея частично работает, за исключением того факта, что когда я вызываю onError() для издателя, поток соединения (№1) продолжает подписку/отмену подписки до тех пор, пока не исчерпается лимит, установленный retryWhen.

Я уверен, что эта проблема, должно быть, была решена в прошлом, когда вы хотите поддерживать TCP-соединение в рабочем состоянии, я просто не знаю, как двигаться дальше. Любая помощь будет оценена по достоинству.


person travikk    schedule 12.09.2015    source источник
comment
Обычно TCP-соединение не просто закрывается, если этого не хочет протокол более высокого уровня. TCP уже имеет некоторые механизмы исправления ошибок, и все, что он не может исправить сам по себе, обычно представляет собой реальную сетевую проблему. Вы уверены, что не хотите полагаться здесь на зрелые механизмы более низкого уровня?   -  person Sebastian S    schedule 12.09.2015
comment
@SebastianS, если произойдет сбой в сети, TCP отключится. Насколько я вижу, в этой конкретной реализации он не переподключается после того, как сеть снова становится доступной.   -  person travikk    schedule 12.09.2015


Ответы (2)


Два потока не нужны. Просто используйте один. Типичная конструкция будет включать Observable.using() для создания сокета, установления наблюдаемого в этом сокете и обработки закрытия сокета, а затем связывания этого с retry() (обычно с задержкой).

person Dave Moten    schedule 12.09.2015
comment
Как Observable узнает, когда нужно повторить попытку? Можете ли вы привести пример? - person travikk; 13.09.2015

 return Observable.using(new Func0<XMPPTCPConnection>() {
        @Override
        public XMPPTCPConnection call() {
            L.log("creating connection");
            connection = _createConnection();
            return connection;
        }
    }, new Func1<XMPPTCPConnection, Observable<?>>() {
        @Override
        public Observable<?> call(final XMPPTCPConnection connection) {

            try {
                _authenticate(connection, username, password);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            return Observable.just(connection)
                    .repeatWhen(new RepeatWhenOperator(-1, 1000))
                    .map(new Func1<XMPPTCPConnection, Object>() {
                        @Override
                        public Object call(XMPPTCPConnection connection) {
                            if (!connection.isConnected()) {
                                throw new RuntimeException("Disconnected");
                            }

                            return null;
                        }
                    });
        }
    }, new Action1<XMPPTCPConnection>() {
        @Override
        public void call(XMPPTCPConnection connection) {
            L.log("disposing");
            _disconnect();
        }
    })
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .retry();

Итак, на основе Observable.using() я придумал это решение. Это работает, но я не доволен решением - то есть опрос каждую секунду, живо ли соединение или нет. XMPPTCPConnection предоставляет мне прослушиватель, когда соединение обрывается, это кажется лучшим решением - просто не уверен, как это включить...

Метод _disconnect() на самом деле предназначен для отключения и удаления; это может быть плохим именем в текущих обстоятельствах, но отключение, которое вызывает исключение RuntimeException, происходит за пределами этой системы.

Любые идеи или улучшения были бы очень признательны!

person travikk    schedule 12.09.2015
comment
_disconnect должен каким-то образом ссылаться на соединение. Я предполагаю, что вы намеренно оставили какой-то код. Что касается раздела RepeatWhen, я не ожидал увидеть его там. Я не могу комментировать XMPPTCPConnection, потому что я с ним не знаком, но с обычными TCP-соединениями в дополнение к повторной попытке с задержкой я использую .timeout, чтобы только если соединение было тихим, мы снова подключались через определенное время. Для этого просто поместите вызов .timeout непосредственно перед последним вызовом .retry(). - person Dave Moten; 13.09.2015
comment
@DaveMoten ах, я вижу, и я предполагаю, что вы периодически получаете пинг, который затем выдает как элемент; если пинг не проходит, это будет означать, что соединение не работает - я правильно понимаю? Спасибо - person travikk; 13.09.2015
comment
Да, можно выполнить ping и отфильтровать его после .timeout() или просто полагаться на обычный трафик, если соединение достаточно загружено. - person Dave Moten; 13.09.2015