Реализация экспоненциального отката с помощью rxjs

В docs Angular 7 приведен пример практического использования rxjs Observables при реализации экспоненциальный откат для запроса AJAX:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

Хотя я понимаю концепцию Observables и backoff, я не могу понять, как именно retryWhen будет рассчитывать временные интервалы для повторной подписки на источник ajax.

В частности, как zip, _ 7_ и _ 8_ работает с этой настройкой?

И что будет содержаться в объекте attempts, когда он будет передан в retryWhen?

Я просмотрел их справочные страницы, но до сих пор не могу осмыслить это.


person CBlew    schedule 26.10.2018    source источник
comment
Rx просто немного запутан в изучении. retryWhen повторные попытки по критерию - attempts - это поток ошибок. zip берет следующий индекс из диапазона. map умножает его на себя (это экспоненциальная часть, поскольку x * x - это просто x ** 2), mapMerge ожидает завершения таймера перед продолжением следующей попытки. Подключение его к ajax сообщит ему, что нужно повторить попытку при возникновении ошибок ajax, а затем экспоненциально откатится (когда поток ошибок выдает ошибку - zip продолжается, карта запускается, и mergeMap ожидает таймера, прежде чем продолжить).   -  person Benjamin Gruenbaum    schedule 26.10.2018
comment
И, конечно же, вы можете просто использовать fetch и реализовать его без Rx, что будет ~ 5 LoC обычного JavaScript и цикла for с try / catch: D   -  person Benjamin Gruenbaum    schedule 26.10.2018
comment
@BenjaminGruenbaum, я не понимаю, как zip здесь работает ... почему он не создает новый Observable из zip каждый раз при повторной подписке?   -  person Explosion Pills    schedule 26.10.2018
comment
@ExplosionPills Оператор retryWhen только один раз подписывается на наблюдаемое, которое находится внутри него. Он передает любые ошибки, полученные от источника, в наблюдаемое, состоящее из него - параметр attempts в стрелочной функции является наблюдаемым для ошибок. По сути, range наблюдаемые "семена" zip с количеством повторных попыток, которые должны быть предприняты.   -  person cartant    schedule 27.10.2018
comment
@BenjaminGruenbaum @cartant Итак, оператор zip захватывает range наблюдаемый (1, 2, 3) и объединяет его с вводом attempts? Но зачем это делать? Я удалил zip из pipe, и функция, похоже, работает так же. Так в чем был смысл застегивания? Разве мы не можем просто игнорировать attempts?   -  person CBlew    schedule 27.10.2018
comment
@CBlew наблюдаемые попытки запускают элемент всякий раз, когда HTTP-запрос терпит неудачу.   -  person Benjamin Gruenbaum    schedule 27.10.2018
comment
@cartant У меня тот же вопрос, что и у CBlew ... если он подписывается только один раз на составленную наблюдаемую, как получается, что использование только range само по себе приведет к повторной подписке навсегда по сравнению с подходом zip?   -  person Explosion Pills    schedule 29.10.2018


Ответы (1)


Я потратил довольно много времени на изучение этого (в учебных целях) и постараюсь объяснить работу этого кода как можно более подробно.

Во-первых, вот исходный код с аннотациями:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}
  1. Достаточно просто, мы создаем собственный оператор backoff из оператора retryWhen. Мы сможем применить это позже в pipe функции.
  2. В этом контексте метод pipe возвращает настраиваемый оператор.
  3. Наш пользовательский оператор будет модифицированным оператором retryWhen. Требуется аргумент функции. Эта функция будет вызываться один раз - в частности, когда этот retryWhen впервые встречается / вызывается. Кстати, retryWhen вступает в игру только, когда наблюдаемый источник выдает ошибку. Затем он предотвращает дальнейшее распространение ошибки и повторно подписывается на источник. Если источник выдает результат без ошибок (при первой подписке или при повторной попытке), retryWhen передается и не участвует.

    Несколько слов о attempts. Это наблюдаемое. Это не наблюдаемый источник. Он создан специально для retryWhen. Он имеет одно и только одно применение: всякий раз, когда подписка (или повторная подписка) на наблюдаемый источник приводит к ошибке, attempts запускает next. Нам дан attempts, и мы можем свободно использовать его, чтобы каким-то образом реагировать на каждую неудачную попытку подписки на наблюдаемый источник.

    Вот что мы собираемся делать.

    Сначала мы создаем range(1, maxTries), наблюдаемый объект, который имеет целое число для каждой повторной попытки, которую мы готовы выполнить. range готов сразу же запустить все свои числа, но мы должны сдержать его: нам нужно только новое число, когда произойдет еще одна повторная попытка. Итак, вот почему мы ...

  4. ... застегните его с помощью attempts. Это означает, что каждое выдаваемое значение attempts соединяется с одним значением range.

    Помните, что функция, в которой мы сейчас работаем, будет вызываться только один раз, и тогда attempts будет запущен только next один раз - для первоначальной неудавшейся подписки. Итак, на данный момент два наших заархивированных наблюдаемых объекта дали только одно значение.

    Кстати, каковы значения двух наблюдаемых, упакованных в одну? Эта функция решает, что: (i) => i. Для наглядности можно написать (itemFromRange, itemFromAttempts) => itemFromRange. Второй аргумент не используется, поэтому он отбрасывается, а первый переименовывается в i.

    Здесь происходит следующее: мы просто игнорируем значения, запущенные attempts, нас интересует только факт, что они активированы. И всякий раз, когда это происходит, мы извлекаем следующее значение из range наблюдаемого ...

  5. ... и возьми его в квадрат. Это для экспоненциальной части экспоненциального отката.

    Итак, теперь всякий раз, когда (повторная) подписка на исходный код не удается, у нас в руках постоянно увеличивающееся целое число (1, 4, 9, 16 ...). Как преобразовать это целое число во временную задержку до следующей повторной подписки?

    Помните, что эта функция, в которой мы сейчас находимся, должна возвращать наблюдаемое, используя attempts в качестве входных данных. Эта результирующая наблюдаемая строится только один раз. retryWhen затем подписывается на полученный наблюдаемый объект и: повторяет попытку подписки на наблюдаемый источник всякий раз, когда возникает наблюдаемый объект next; вызывает complete или error для наблюдаемого источника всякий раз, когда результирующий наблюдаемый объект запускает соответствующие события.

  6. Короче говоря, нам нужно заставить retryWhen немного подождать. Можно использовать оператор delay, но установка экспоненциального роста задержки, вероятно, была бы болезненной. Вместо этого в игру вступает оператор mergeMap.

    mergeMap - это сочетание клавиш для двух операторов: map и mergeAll. map просто преобразует каждое увеличивающееся целое число (1, 4, 9, 16 ...) в timer наблюдаемое, которое запускается next после прохождения количества миллисекунд. mergeAll заставляет retryWhen подписаться на timer. Если бы этого последнего бита не было, наш результирующий наблюдаемый объект просто сразу next сработал бы с timer экземпляром наблюдаемого в качестве значения.

  7. На данный момент мы создали нашу настраиваемую наблюдаемую, которая будет использоваться retryWhen, чтобы решить, когда именно пытаться повторно подписаться на наблюдаемую исходную информацию.

В настоящее время я вижу две проблемы с этой реализацией:

  • Как только наш результирующий наблюдаемый запускает свой последний next (вызывая последнюю попытку повторной подписки), он также немедленно запускает complete. Если наблюдаемый источник не возвращает результат очень быстро (при условии, что последняя попытка будет успешной), этот результат будет проигнорирован.

    Это потому, что как только retryWhen слышит complete от нашего наблюдаемого, он вызывает complete в источнике, который, возможно, все еще находится в процессе выполнения запроса AJAX.

  • Если все попытки были неудачными, источник фактически вызывает complete вместо более логичного error.

Чтобы решить обе эти проблемы, я думаю, что наша результирующая наблюдаемая должна сработать error в самом конце, после того, как последняя попытка даст некоторое разумное время, чтобы попытаться выполнить свою работу.

Вот моя реализация указанного исправления, которая также учитывает устаревание оператора zip в последней версии rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

Я протестировал этот код, и, похоже, он работает правильно во всех случаях. Я не хочу сейчас объяснять это подробно; Я сомневаюсь, что кто-то вообще прочитает стену с текстом выше.

В любом случае, большое спасибо @BenjaminGruenbaum и @cartant за то, что они указали мне правильный путь, чтобы осознать все это.

person CBlew    schedule 27.10.2018
comment
Это выглядит слишком сложно. Возможно, вам стоит взглянуть на реализацию Алекса: github.com/alex-okrushko/backoff-rxjs - person cartant; 29.10.2018
comment
Я действительно прочитал стену текста - очень интересный случай [де] материализации. Я посмотрю, смогу ли я объединить ошибки всех повторных попыток и выдать их при выводе retryWhen - person user776686; 06.04.2021