Прекарах доста време в проучване на това (с цел обучение) и ще се опитам да обясня работата на този код възможно най-подробно.
Първо, ето оригиналния код, пояснен:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) { // (1)
return pipe( // (2)
retryWhen(attempts => range(1, maxTries) // (3)
.pipe(
zip(attempts, (i) => i), // (4)
map(i => i * i), // (5)
mergeMap(i => timer(i * ms)) // (6)
)
)
); // (7)
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
- Достатъчно лесно, създаваме персонализиран оператор
backoff
от оператор retryWhen
. Ще можем да приложим това по-късно във функцията pipe
.
- В този контекст методът
pipe
връща потребителски оператор.
Нашият персонализиран оператор ще бъде модифициран оператор retryWhen
. Необходим е аргумент на функцията. Тази функция ще бъде извикана веднъж — по-специално, когато този retryWhen
бъде срещнат/извикан за първи път. Между другото, retryWhen
влиза в действие само когато наблюдаемият източник генерира грешка. След това предотвратява по-нататъшното разпространение на грешката и се абонира отново за източника. Ако източникът генерира резултат без грешка (независимо дали при първо абониране или при повторен опит), retryWhen
се пропуска и не се включва.
Няколко думи за attempts
. Това е наблюдаемо. Не е източникът, който може да се наблюдава. Създаден е специално за retryWhen
. Има едно и само едно използване: когато абонаментът (или повторният абонамент) за наблюдаемия източник доведе до грешка, attempts
задейства next
. Дадено ни е attempts
и сме свободни да го използваме, за да реагираме по някакъв начин на всеки неуспешен опит за абониране за наблюдаемия източник.
Така че това е, което ще направим.
Първо създаваме range(1, maxTries)
, наблюдаема, която има цяло число за всеки повторен опит, който сме готови да извършим. range
е готов да задейства всичките си номера веднага и там, но трябва да задържим конете му: имаме нужда от нов номер само когато се случи нов повторен опит. Ето защо ние...
... ципирайте го с attempts
. Това означава, че съчетайте всяка излъчена стойност на attempts
с една стойност на range
.
Не забравяйте, че функцията, в която се намираме в момента, ще бъде извикана само веднъж и по това време attempts
ще е задействал next
само веднъж — за първоначалния неуспешен абонамент. И така, в този момент нашите две компресирани наблюдаеми са произвели само една стойност.
Между другото, какви са стойностите на двете наблюдаеми, компресирани в една? Тази функция решава, че: (i) => i
. За по-голяма яснота може да се напише (itemFromRange, itemFromAttempts) => itemFromRange
. Вторият аргумент не се използва, така че е изпуснат и първият е преименуван на i
.
Какво се случва тук, е просто да пренебрегнем стойностите, задействани от attempts
, ние се интересуваме само от факта, че те са задействани. И всеки път, когато това се случи, изтегляме следващата стойност от range
наблюдаема...
... и го на квадрат. Това е за експоненциалната част от експоненциалното забавяне.
И така, сега, когато (повторното) абониране за източник е неуспешно, имаме непрекъснато нарастващо цяло число (1, 4, 9, 16...). Как да трансформираме това цяло число във времево забавяне до следващото повторно абониране?
Не забравяйте, че тази функция, в която се намираме в момента, трябва да върне наблюдаема, използвайки attempts
като вход. Тази получена наблюдаема се изгражда само веднъж. retryWhen
след това се абонира за тази получена наблюдаема и: прави повторен опит да се абонира за източника наблюдаема всеки път, когато получената наблюдаема се задейства next
; извиква complete
или error
на източник observable, когато резултатът observable задейства тези съответни събития.
Накратко, трябва да накараме retryWhen
да изчака малко. Може да се използва оператор delay
, но настройването на експоненциално нарастване на забавянето вероятно ще бъде болезнено. Вместо това влиза в действие операторът mergeMap
.
mergeMap
е пряк път за комбиниране на два оператора: map
и mergeAll
. map
просто преобразува всяко нарастващо цяло число (1, 4, 9, 16...) в timer
наблюдаем, който задейства next
след изминал брой милисекунди. mergeAll
принуждава retryWhen
действително да се абонира за timer
. Ако последният бит не се случи, нашата получена наблюдаема просто ще задейства next
незабавно с timer
наблюдаема инстанция като стойност.
На този етап създадохме нашия персонализиран наблюдаем, който ще се използва от retryWhen
, за да реши кога точно да се опита да се абонира отново за източника наблюдаем.
В настоящия момент виждам два проблема с тази реализация:
Веднага след като нашата получена наблюдаема задейства последния си next
(причинявайки последния опит за повторно абониране), тя също незабавно задейства complete
. Освен ако наблюдаемият източник не върне резултат много бързо (приемайки, че последният повторен опит ще бъде успешният), този резултат ще бъде игнориран.
Това е така, защото веднага щом retryWhen
чуе complete
от нашия наблюдаем, той извиква complete
на източника, който все още може да е в процес на правене на AJAX заявка.
Ако всички повторни опити са били неуспешни, източникът всъщност извиква complete
вместо по-логично error
.
За да разрешим и двата проблема, мисля, че нашата получена наблюдаема трябва да задейства error
в самия край, след като даде на последния повторен опит известно време, за да се опита да свърши работата си.
Ето моето внедряване на споменатата корекция, което също взема предвид оттеглянето на оператора zip
в последния rxjs v6
:
import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";
function backoffImproved(maxTries, ms) {
return pipe(
retryWhen(attempts => {
const observableForRetries =
zip(range(1, maxTries), attempts)
.pipe(
map(([elemFromRange, elemFromAttempts]) => elemFromRange),
map(i => i * i),
switchMap(i => timer(i * ms))
);
const observableForFailure =
throwError(new Error('Could not complete AJAX request'))
.pipe(
materialize(),
delay(1000),
dematerialize()
);
return concat(observableForRetries, observableForFailure);
})
);
}
Тествах този код и изглежда, че работи правилно във всички случаи. Не мога да се притеснявам да го обясня подробно точно сега; Съмнявам се, че някой дори ще прочете стената с текст по-горе.
Както и да е, големи благодарности на @BenjaminGruenbaum и @cartant, че ме насочиха на правилния път, за да обмисля всичко това.
person
CBlew
schedule
27.10.2018
retryWhen
прави повторен опит по критерий -attempts
е потокът от грешки.zip
грабва следващия индекс от диапазона.map
го умножава по себе си (това е експоненциалната част, тъй като x * x е просто x ** 2), mapMerge изчаква таймерът да приключи, преди да продължи следващият опит. Прехвърлянето му към ajax ще му каже да опита отново, когато ajax възникне грешка и след това ще се откаже експоненциално (когато потокът от грешки издаде грешка - компресирането продължава, картата се изпълнява и mergeMap изчаква таймера, преди да продължи). - person Benjamin Gruenbaum   schedule 26.10.2018fetch
и да го внедрите без Rx, което би било ~5 LoC на обикновен JavaScript и for цикъл с try/catch :D - person Benjamin Gruenbaum   schedule 26.10.2018zip
работи тук ... защо не създава нов Observable отzip
всеки път, когато се абонира отново? - person Explosion Pills   schedule 26.10.2018retryWhen
се абонира само веднъж за наблюдаемото, което е съставено в него. Той подава всички грешки, получени от източника, към наблюдаемото, съставено в него - параметърътattempts
към стрелковата функция е наблюдаем от грешки. По същество,range
наблюдаемото "засява"zip
с броя повторни опити, които трябва да се опитат. - person cartant   schedule 27.10.2018zip
хващаrange
наблюдаемото (1, 2, 3) и го обединява с входаattempts
? Но защо правя това? Премахнахzip
отpipe
и функцията изглежда работи по същия начин. И така, какъв беше смисълът от ципирането? Не можем ли просто да игнорирамеattempts
? - person CBlew   schedule 27.10.2018range
само по себе си ще се абонира отново завинаги срещу подходаzip
? - person Explosion Pills   schedule 29.10.2018