Запрос по планированию потоков RxJava

Я новичок в RxJava2. В приведенном ниже коде я не могу понять, как подписчик работает в фоновом потоке, даже если Observable/Flowable испускается в основном потоке, а планировщик не указан (используя вызовы subscribeOn(Schedulers.*)). Полный код можно найти в этот репозиторий github.

@OnClick(R.id.btn_start_simple_polling)
public void onStartSimplePollingClicked() {

    _log("onStartSimplePollingClicked called on ");  //MAIN THREAD

    final int pollCount = POLL_COUNT;

    Disposable d = Observable
          .interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
          .map(this::_doNetworkCallAndGetStringResult)
          .take(pollCount)
          .doOnSubscribe(subscription -> {
              _log(String.format("Start simple polling - %s", _counter));     //MAIN THREAD
          })
          .subscribe(taskName -> {
              _log(String.format(Locale.US,
                                 "Executing polled task [%s] now time : [xx:%02d]",
                                 taskName,
                                 _getSecondHand()));
          });

    _disposables.add(d);
}

private String _doNetworkCallAndGetStringResult(long attempt) {
    try {
        _log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD
        if (attempt == 4) {
            // randomly make one event super long so we test that the repeat logic waits
            // and accounts for this.
            Thread.sleep(9000);
        }
        else {
            Thread.sleep(3000);
        }

    } catch (InterruptedException e) {
        Timber.d("Operation was interrupted");
    }
    _counter++;

    return String.valueOf(_counter);
}

person Jaguar    schedule 01.07.2017    source источник


Ответы (2)


Поскольку вы не указали планировщик для подписки, RxJava по умолчанию использует синхронную подписку. Таким образом, вызовы onSubscribe и doOnSubscribe происходят в основном потоке.

Однако оператору Observable.interval требуется либо неявный, либо явный планировщик для трансляции событий onNext. Поскольку вы не указали планировщик, по умолчанию используется Schedulers.computation(). После запуска интервала он продолжает вызывать _doNetworkCallAndGetStringResult в том же потоке вычислений, что происходит в фоновом режиме.

person Kiskae    schedule 01.07.2017

RxJava по умолчанию запускается синхронно, но некоторые операторы, как @Kiskae уже сказал вам, как интервал, задержка или некоторые другие

Если вы хотите запускать конвейер асинхронно, вам придется использовать наблюдателя, который заставит запустить конвейер в другом потоке после того, как он будет помещен в ваш конвейер.

 /**
     * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
     * Shall print
     * First step main
     * Second step RxNewThreadScheduler-2
     * Third step RxNewThreadScheduler-1
     */
    @Test
    public void testObservableObserverOn() throws InterruptedException {
        Subscription subscription = Observable.just(1)
                .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                        .getName()))
                .observeOn(Schedulers.newThread())
                .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                        .getName()))
                .observeOn(Schedulers.newThread())
                .doOnNext(number -> System.out.println("Third step " + Thread.currentThread()
                        .getName()))
                .subscribe();
        new TestSubscriber((Observer) subscription)
                .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
    }

или используйте subscribeOn, который заставит ваш конвейер работать в указанном вами потоке

/**
 * Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline,
 * all steps will be executed in another thread.
 * Shall print
 * First step RxNewThreadScheduler-1
 * Second step RxNewThreadScheduler-1
 */
@Test
public void testObservableSubscribeOn() throws InterruptedException {
    Subscription subscription = Observable.just(1)
            .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                    .getName()))
            .subscribeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                    .getName()))
            .subscribe();
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

Вы можете увидеть больше примеров об асинхронном rxJava здесь https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

person paul    schedule 01.07.2017