RxJava — ConnectableObservable, отключение и повторное подключение

Я пытаюсь воспроизвести пример кода из раздела «Отключение» здесь.

Отключение

Как мы видели в сигнатуре подключения, этот метод возвращает Subscription, как это делает Observable.subscribe. Вы можете использовать эту ссылку для прекращения подписки ConnectableObservable. Это предотвратит передачу событий наблюдателям, но не отменит их подписку на ConnectableObservable. Если вы снова вызовете connect, ConnectableObservable запустит новую подписку, а старые наблюдатели снова начнут получать значения.

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();

connectable.subscribe(i -> System.out.println(i));

Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();

Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();

Выход

0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...

Используя RxJava 2.0.8, у меня есть:

    ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
    Disposable s = connectable.connect();

    connectable.subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(Long aLong) {
            Log.d("test", "Num: " + aLong);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Closing connection");
    s.dispose();

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.d("test", "Reconnecting...");
    connectable.connect();

Выход

Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...

Заранее спасибо....


person veritas1    schedule 15.04.2017    source источник
comment
Я не уверен, что понимаю вашу проблему   -  person Cochi    schedule 15.04.2017
comment
@Cochi в моем коде, мой подписчик не получает значения после того, как подключаемый источник был отключен, а затем снова подключен.   -  person veritas1    schedule 15.04.2017


Ответы (1)


Кажется, это поведение не было принято RxJava. Рабочий пример взят из Rx.NET. См. https://github.com/ReactiveX/RxJava/issues/4771.

person veritas1    schedule 15.04.2017
comment
Вероятно, да. Единственный раз, когда я могу заставить его работать, это когда я снова subscribe() перед повторным вызовом connect(). Это то, как это должно работать после вызова dispose()? - person Wahib Ul Haq; 26.12.2017