Retrofit2+RxJava2, неверный токен, как обновить поток при повторной подписке retryWhen()

У меня есть этот простой код ниже, который имитирует сценарий, который я сейчас пытаюсь выполнить.

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                @Override
                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Void value) {
                }

                @Override
                public void onError(Throwable e) {

                    e.printStackTrace();
                    onError(e);
                }

                @Override
                public void onComplete() {
                }
            });

Я просто перечислю это, чтобы прояснить мою цель:

  1. выполнить вызов POST с текущим токеном доступа
  2. если он получает соответствующую ошибку (404,403, 401 или что-то подобное)
  3. выполнить вызов GET, чтобы получить новый токен доступа
  4. повторите всю последовательность, используя новый токен доступа

основываясь на приведенном выше коде и моем понимании .retryWhen(), он будет выполнен, если произойдет ошибка в исходном Observable( .postSomethingWithAccessToken( )) и при необходимости повторить попытку (на основе ваших условий внутри повторной попытки), здесь происходит то, что .retryWhen() выполняется сначала перед внешним Observable, вызывая нежелательный дублирующий запрос, как Могу ли я достичь тех вещей, которые я упомянул выше, на основе моего текущего понимания (кода)? Любая помощь будет оценена. :(

Изменить: текущий обходной путь:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                @Override
                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                @Override
                                                public void accept(Authentication authentication) throws Exception {
                                                    update(authentication);
                                                }
                                            });
                                }
                            }

                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");
                }
            });

Метод, который обновляет токен через общие настройки

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}

Я заметил, что (я поставил журнал) внешняя наблюдаемая подписка и retryWhen выполнялись в основном потоке, но поток повторной попытки/повторной подписки перескакивает через другой поток планировщика, это похоже на состояние гонки :(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...

person Robert    schedule 19.04.2017    source источник


Ответы (2)


Здесь есть несколько проблем:

  • вам нужно передать токен доступа методу postSomethingWithAccessToken при повторной попытке, иначе вы просто повторите попытку с тем же старым недействительным токеном доступа.
  • ваша повторная попытка, когда логика неверна, вы должны реагировать на ошибки Observable, которые вы получаете, и помещать туда свою логику повторной попытки. как вы сказали, этот метод выполняется первым, а не когда возникает ошибка, throwableObservable - это ответ на ошибку, он будет отражать ошибки как выбросы (onNext()), вы можете flatMap() каждую ошибку и ответ либо с ошибкой (для доставки ошибки источнику stream) complete или с помощью onNext() с каким-либо объектом, чтобы сигнализировать о повторной попытке.
    Отличный запрет сообщений в блоге Дэна Лью на эту тему.

Итак, вам нужно:
1) хранить токен доступа где-то, где вы можете изменить его с помощью обновления токена доступа.
2) исправить повторную попытку, когда логика правильно реагирует на ошибки.

Вот код предложения:

postSomethingWithAccessToken(request, accessToken)
        .subscribeOn(Schedulers.io())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                   @Override
                   public ObservableSource<?> apply(
                           @NonNull Observable<Throwable> throwableObservable) throws Exception {
                       return throwableObservable.flatMap(
                               new Function<Throwable, ObservableSource<? extends R>>() {
                                   @Override
                                   public ObservableSource<? extends R> apply(
                                           @NonNull Throwable throwable) throws Exception {
                                       if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                           return getAccessToken()
                                                           .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                   //or keep accessToken on some field, the point to have mutable
                                                   //var that you can change and postSomethingWithAccessToken can see
                                       }
                                       return Observable.error(throwable);
                                   }
                               });
                       }
                   }
        )
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<Result>() {
                       @Override
                       public void accept(@NonNull Result result) throws Exception {
                           //handle result
                       }
                   }
        );
person yosriz    schedule 19.04.2017
comment
привет, большое спасибо за предоставленный пример кода и упоминание об ошибках, если это не слишком много, чтобы спросить, не могли бы вы изменить код на не-лямбда?, - person Robert; 19.04.2017
comment
Большое спасибо !, плохо доберусь, плохо попробую переработать мой код. :), спасибо и за ссылку :) - person Robert; 19.04.2017
comment
просто вопрос, действительно ли нормально, что retryWhen() выполняется первым? до фактического внешнего наблюдаемого выполнения? - я не знаю, правильно ли я понимаю -> Factory Func1 вызывается при подписке для настройки логики повторных попыток. Таким образом, при вызове onError вы уже определили, как с этим справиться. - person Robert; 19.04.2017
comment
да, как объяснено, логика повтора сначала строится вашим методом Func1, затем подпишется throwableObservable и только потом исходный Observable. поэтому вы не можете выполнять там реальную работу, вы должны интегрировать логику повторных попыток в поток throwableObservable. это означает, что повторная попытка основана на реагировании на выбросы ошибок throwableObservable, а не на тело метода Func1. - person yosriz; 19.04.2017
comment
предложенный вами код полностью работает, у меня просто есть некоторые проблемы с обновлением токена даже в виде статической переменной или поля объекта, что делает повторную попытку безостановочной, в любом случае, логика повторной попытки сначала создается вашим методом Func1, затем throwableObservable подпишется и только потом исходный Observable. - спасибо, это хороший старт для меня :) :) спасибо спасибо - person Robert; 20.04.2017
comment
привет еще раз, не могли бы вы подсказать мне, как обновить источник токена? Я опубликовал свою текущую реализацию, я немного подправил предложенный вами код выше, и он сработал как шарм, но он продолжает использовать тот же accessToken, он не обновляется даже как поле, статическое, теперь в этом случае токен сохраняется через предпочтение, он продолжает использовать старый токен :( - person Robert; 20.04.2017
comment
Давайте продолжим обсуждение в чате. - person yosriz; 20.04.2017

БОЛЬШОЕ Спасибо yosriz за то, что он указал мне правильное направление для решения моей проблемы со скрежетом зубов, я должен использовать defer. Итак, я столкнулся с этой проблемой в GitHub, Зачем повторно подписывать исходный наблюдаемый вывод, когда я использую retryWhen оператор?

Это точно такая же проблема, с которой я сталкиваюсь сейчас, для тех, кто испытывает ту же проблему, вот мое решение.

Observable
    .defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            // return an observable source here, the observable that will be the source of the entire stream;
        }
    })
    .subscribeOn( /*target thread to run*/ )
    .retryWhen( {
        // return a throwable observable here that will perform the logic when an error occurred
    })
    .subscribe( /*subscription here*/ )

или вот полная нелямбда моего решения

Observable
    .defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            return mApiService.api().postSomethingWithAccessToken(
                request, preferences.getString("access_token", ""));
        }
    })
    .subscribeOn(Schedulers.io())
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    if (throwable instanceof HttpException) {
                        HttpException httpException = (HttpException) throwable;
                        if (httpException.code() == 401) {
                            return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
                                    @Override
                                    public void accept(Authentication authentication) throws Exception {
                                        update(authentication);
                                    }
                                });
                        }
                    }
                    return Observable.error(throwable);
                }
            });
        }
    })
    .subscribe(new Observer<Void>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
        }

        @Override
        public void onNext(Void value) {
            Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.e("Complete", "____ COMPLETE");
        }
    });

Ключевым моментом здесь является «как изменить/обновить существующий наблюдаемый источник, когда оператор .retryWhen() повторно подписывается на наблюдаемый источник»

person Robert    schedule 21.04.2017