Я пытаюсь воспроизвести пример кода из раздела «Отключение» здесь.
Отключение
Как мы видели в сигнатуре подключения, этот метод возвращает Subscription, как это делает Observable.subscribe. Вы можете использовать эту ссылку для прекращения подписки ConnectableObservable. Это предотвратит передачу событий наблюдателям, но не отменит их подписку на ConnectableObservable. Если вы снова вызовете connect, ConnectableObservable запустит новую подписку, а старые наблюдатели снова начнут получать значения.
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
Выход
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
Используя RxJava 2.0.8, у меня есть:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
Выход
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
Заранее спасибо....