Я потратил довольно много времени на изучение этого (в учебных целях) и постараюсь объяснить работу этого кода как можно более подробно.
Во-первых, вот исходный код с аннотациями:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) { // (1)
return pipe( // (2)
retryWhen(attempts => range(1, maxTries) // (3)
.pipe(
zip(attempts, (i) => i), // (4)
map(i => i * i), // (5)
mergeMap(i => timer(i * ms)) // (6)
)
)
); // (7)
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
- Достаточно просто, мы создаем собственный оператор
backoff
из оператора retryWhen
. Мы сможем применить это позже в pipe
функции.
- В этом контексте метод
pipe
возвращает настраиваемый оператор.
Наш пользовательский оператор будет модифицированным оператором retryWhen
. Требуется аргумент функции. Эта функция будет вызываться один раз - в частности, когда этот retryWhen
впервые встречается / вызывается. Кстати, retryWhen
вступает в игру только, когда наблюдаемый источник выдает ошибку. Затем он предотвращает дальнейшее распространение ошибки и повторно подписывается на источник. Если источник выдает результат без ошибок (при первой подписке или при повторной попытке), retryWhen
передается и не участвует.
Несколько слов о attempts
. Это наблюдаемое. Это не наблюдаемый источник. Он создан специально для retryWhen
. Он имеет одно и только одно применение: всякий раз, когда подписка (или повторная подписка) на наблюдаемый источник приводит к ошибке, attempts
запускает next
. Нам дан attempts
, и мы можем свободно использовать его, чтобы каким-то образом реагировать на каждую неудачную попытку подписки на наблюдаемый источник.
Вот что мы собираемся делать.
Сначала мы создаем range(1, maxTries)
, наблюдаемый объект, который имеет целое число для каждой повторной попытки, которую мы готовы выполнить. range
готов сразу же запустить все свои числа, но мы должны сдержать его: нам нужно только новое число, когда произойдет еще одна повторная попытка. Итак, вот почему мы ...
... застегните его с помощью attempts
. Это означает, что каждое выдаваемое значение attempts
соединяется с одним значением range
.
Помните, что функция, в которой мы сейчас работаем, будет вызываться только один раз, и тогда attempts
будет запущен только next
один раз - для первоначальной неудавшейся подписки. Итак, на данный момент два наших заархивированных наблюдаемых объекта дали только одно значение.
Кстати, каковы значения двух наблюдаемых, упакованных в одну? Эта функция решает, что: (i) => i
. Для наглядности можно написать (itemFromRange, itemFromAttempts) => itemFromRange
. Второй аргумент не используется, поэтому он отбрасывается, а первый переименовывается в i
.
Здесь происходит следующее: мы просто игнорируем значения, запущенные attempts
, нас интересует только факт, что они активированы. И всякий раз, когда это происходит, мы извлекаем следующее значение из range
наблюдаемого ...
... и возьми его в квадрат. Это для экспоненциальной части экспоненциального отката.
Итак, теперь всякий раз, когда (повторная) подписка на исходный код не удается, у нас в руках постоянно увеличивающееся целое число (1, 4, 9, 16 ...). Как преобразовать это целое число во временную задержку до следующей повторной подписки?
Помните, что эта функция, в которой мы сейчас находимся, должна возвращать наблюдаемое, используя attempts
в качестве входных данных. Эта результирующая наблюдаемая строится только один раз. retryWhen
затем подписывается на полученный наблюдаемый объект и: повторяет попытку подписки на наблюдаемый источник всякий раз, когда возникает наблюдаемый объект next
; вызывает complete
или error
для наблюдаемого источника всякий раз, когда результирующий наблюдаемый объект запускает соответствующие события.
Короче говоря, нам нужно заставить retryWhen
немного подождать. Можно использовать оператор delay
, но установка экспоненциального роста задержки, вероятно, была бы болезненной. Вместо этого в игру вступает оператор mergeMap
.
mergeMap
- это сочетание клавиш для двух операторов: map
и mergeAll
. map
просто преобразует каждое увеличивающееся целое число (1, 4, 9, 16 ...) в timer
наблюдаемое, которое запускается next
после прохождения количества миллисекунд. mergeAll
заставляет retryWhen
подписаться на timer
. Если бы этого последнего бита не было, наш результирующий наблюдаемый объект просто сразу next
сработал бы с timer
экземпляром наблюдаемого в качестве значения.
На данный момент мы создали нашу настраиваемую наблюдаемую, которая будет использоваться retryWhen
, чтобы решить, когда именно пытаться повторно подписаться на наблюдаемую исходную информацию.
В настоящее время я вижу две проблемы с этой реализацией:
Как только наш результирующий наблюдаемый запускает свой последний next
(вызывая последнюю попытку повторной подписки), он также немедленно запускает complete
. Если наблюдаемый источник не возвращает результат очень быстро (при условии, что последняя попытка будет успешной), этот результат будет проигнорирован.
Это потому, что как только retryWhen
слышит complete
от нашего наблюдаемого, он вызывает complete
в источнике, который, возможно, все еще находится в процессе выполнения запроса AJAX.
Если все попытки были неудачными, источник фактически вызывает complete
вместо более логичного error
.
Чтобы решить обе эти проблемы, я думаю, что наша результирующая наблюдаемая должна сработать error
в самом конце, после того, как последняя попытка даст некоторое разумное время, чтобы попытаться выполнить свою работу.
Вот моя реализация указанного исправления, которая также учитывает устаревание оператора zip
в последней версии rxjs v6
:
import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";
function backoffImproved(maxTries, ms) {
return pipe(
retryWhen(attempts => {
const observableForRetries =
zip(range(1, maxTries), attempts)
.pipe(
map(([elemFromRange, elemFromAttempts]) => elemFromRange),
map(i => i * i),
switchMap(i => timer(i * ms))
);
const observableForFailure =
throwError(new Error('Could not complete AJAX request'))
.pipe(
materialize(),
delay(1000),
dematerialize()
);
return concat(observableForRetries, observableForFailure);
})
);
}
Я протестировал этот код, и, похоже, он работает правильно во всех случаях. Я не хочу сейчас объяснять это подробно; Я сомневаюсь, что кто-то вообще прочитает стену с текстом выше.
В любом случае, большое спасибо @BenjaminGruenbaum и @cartant за то, что они указали мне правильный путь, чтобы осознать все это.
person
CBlew
schedule
27.10.2018
retryWhen
повторные попытки по критерию -attempts
- это поток ошибок.zip
берет следующий индекс из диапазона.map
умножает его на себя (это экспоненциальная часть, поскольку x * x - это просто x ** 2), mapMerge ожидает завершения таймера перед продолжением следующей попытки. Подключение его к ajax сообщит ему, что нужно повторить попытку при возникновении ошибок ajax, а затем экспоненциально откатится (когда поток ошибок выдает ошибку - zip продолжается, карта запускается, и mergeMap ожидает таймера, прежде чем продолжить). - person Benjamin Gruenbaum   schedule 26.10.2018fetch
и реализовать его без Rx, что будет ~ 5 LoC обычного JavaScript и цикла for с try / catch: D - person Benjamin Gruenbaum   schedule 26.10.2018zip
здесь работает ... почему он не создает новый Observable изzip
каждый раз при повторной подписке? - person Explosion Pills   schedule 26.10.2018retryWhen
только один раз подписывается на наблюдаемое, которое находится внутри него. Он передает любые ошибки, полученные от источника, в наблюдаемое, состоящее из него - параметрattempts
в стрелочной функции является наблюдаемым для ошибок. По сути,range
наблюдаемые "семена"zip
с количеством повторных попыток, которые должны быть предприняты. - person cartant   schedule 27.10.2018zip
захватываетrange
наблюдаемый (1, 2, 3) и объединяет его с вводомattempts
? Но зачем это делать? Я удалилzip
изpipe
, и функция, похоже, работает так же. Так в чем был смысл застегивания? Разве мы не можем просто игнорироватьattempts
? - person CBlew   schedule 27.10.2018range
само по себе приведет к повторной подписке навсегда по сравнению с подходомzip
? - person Explosion Pills   schedule 29.10.2018