Как заставить flatMap выполняться в фоновом потоке

Я использую Retrofit и RxJava для выполнения некоторых фоновых задач. Код выглядит так:

public class MyLoader{  
  public Observable<MyData> getMyData(){
      return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() {
              @Override
              public Observable<MyData> call(MyHelper myHelper) {
                  return queryData(myHelper);
              }
      });
  }

  private Observable<MyData> queryData(MyHelper myHelper){
      ...
  }

  private Observable<MyHelper> setupHelper(){
     return Observable.create(new Observable.OnSubscribe<MyHelper>() {
          @Override
          public void call(final Subscriber<? super MyHelper> subscriber) {
              try{
                MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
                subscriber.onNext(helper);
                subscriber.onCompleted();
              }catch(RetrofitError e){
                subscriber.onError(e)
              }
          }
     }
  }
}

Это не удается с RetrofitError из-за исключения NetworkOnMainThread в этой строке:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data

Подписка на мой Observable:

myLoader.getMyData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<MyData>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(MyData inventory) {

                    }
                });

Согласно документации Rx, flatMap не работает ни с одним фоновым потоком. Мой вопрос заключается в том, как мне убедиться, что весь метод getMyData() работает в фоновом режиме.


person Community    schedule 03.09.2015    source источник
comment
На первый взгляд, я не вижу ничего плохого. Не могли бы вы вместо этого поместить .subscribeOn(Schedulers.io()) в setupHelper?   -  person akarnokd    schedule 03.09.2015
comment
Retrofit уже интегрируется с RxJava и может напрямую возвращать Observables, поэтому вам не нужно создавать собственные Observables, выполняющие запросы.   -  person BladeCoder    schedule 08.09.2015
comment
Вы нашли другой подход, чем добавление .subscribeOn() во второй наблюдаемый?   -  person Juan Saravia    schedule 09.12.2015
comment
Кажется, это правильный ответ: stackoverflow.com/a/35429084/2908525   -  person Juan Bustamante    schedule 08.05.2017
comment
Retrofit может возвращать Observable напрямую, вам не нужно оборачивать вызов таким образом. В противном случае вы также можете просто использовать Observable.fromCallable(this::makeRetrofitCall) (при условии, что у вас есть ссылки на функции либо через Java 8, либо через retrolambda).   -  person njzk2    schedule 08.05.2017


Ответы (3)



Я просто добавляю observeOn(Schedulers.newThread()) перед flatMap и все работает!

person initialjie    schedule 31.03.2016
comment
Добро пожаловать на сайт! Пожалуйста, используйте ответы исключительно для ответа на вопрос. Если у вас есть другой вопрос, задайте его, нажав кнопку Задать вопрос в верхней части страницы. Если у вас точно такая же проблема, как эта, когда у вас достаточно репутации, вы можете проголосовать за вопрос или добавьте награду. Вы также можете пометить вопрос как избранный, и в этом случае система будет уведомлять вас о любых новых ответах. - person Mogsdad; 31.03.2016
comment
Почему мой ответ не принят? Я не вижу ничего плохого. Кто-нибудь может сказать мне, почему? - person initialjie; 18.04.2016
comment
Я думаю, это потому, что, вызываяObservOn перед flatMap, вы переключаете все на фоновые потоки, а не только один шаг в конвейере. - person Juan Bustamante; 08.05.2017
comment
@Juan Bustamante subscribeOn действует только во время последнего вызова. - person initialjie; 03.03.2018

Это перемещает только один шаг в конвейере к фоновому потоку:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

Первоначально ответил здесь: https://stackoverflow.com/a/35429084/2908525

person Juan Bustamante    schedule 08.05.2017

Есть хороший шанс, что когда вы создаете объект MyLoader в основном потоке, Observable.create также будет выполнен (или, может быть, где-то еще раньше в вашем коде (?)). Если это так, то .subscribeOn(Schedulers.io()) не повлияет на изменение потока.

Вы можете попробовать обернуть .create() с помощью .defer(), чтобы убедиться, что Observable создается только при подписке.

e.g. defer(() -> create(....))

person Diolor    schedule 07.09.2015