У меня есть этот простой код ниже, который имитирует сценарий, который я сейчас пытаюсь выполнить.
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
Я просто перечислю это, чтобы прояснить мою цель:
- выполнить вызов POST с текущим токеном доступа
- если он получает соответствующую ошибку (404,403, 401 или что-то подобное)
- выполнить вызов GET, чтобы получить новый токен доступа
- повторите всю последовательность, используя новый токен доступа
основываясь на приведенном выше коде и моем понимании .retryWhen(), он будет выполнен, если произойдет ошибка в исходном Observable( .postSomethingWithAccessToken( )) и при необходимости повторить попытку (на основе ваших условий внутри повторной попытки), здесь происходит то, что .retryWhen() выполняется сначала перед внешним Observable, вызывая нежелательный дублирующий запрос, как Могу ли я достичь тех вещей, которые я упомянул выше, на основе моего текущего понимания (кода)? Любая помощь будет оценена. :(
Изменить: текущий обходной путь:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
Метод, который обновляет токен через общие настройки
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
Я заметил, что (я поставил журнал) внешняя наблюдаемая подписка и retryWhen выполнялись в основном потоке, но поток повторной попытки/повторной подписки перескакивает через другой поток планировщика, это похоже на состояние гонки :(
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...