RxJS е библиотека за реактивно програмиране. Операторите за създаване са полезни за генериране на данни от различни източници на данни, за които да се абонират наблюдателите.
В тази статия ще разгледаме някои прозоречни оператори, включително операторите windowCount
, windowTime
, windowToggle
и windowWhen
.
windowCount
Операторът windowCount
разклонява стойностите на източника Observable като вложена Observable, като всяка от тях излъчва най-много windowSize
събития.
Необходими са до 2 аргумента. Първият аргумент е windowSize
, което е максималният брой стойности, които трябва да бъдат излъчвани от всеки прозорец.
Вторият аргумент не е задължителен. Това е числото startWindowEvery
, което по подразбиране е 0. Това е интервалът за стартиране на нов прозорец. Интервалът се измерва с броя елементи, излъчвани от източника Observable.
Например, можем да го използваме, както следва:
import { interval } from "rxjs"; import { windowCount, mergeAll, take, map, skip } from "rxjs/operators"; const nums = interval(1000).pipe(take(1000)); const result = nums.pipe( windowCount(3, 3), map(win => win.pipe(skip(1))), mergeAll() ); result.subscribe(x => console.log(x));
Кодът по-горе има nums
Observable, който излъчва число всяка секунда до 1000.
Това е pipe
d към оператора windowCount
, който излъчва 3 стойности наведнъж и започва нов прозорец след 3 стойности, излъчени от nums
.
Тогава това е map
ped към win.pipe(skip(1))
Observable, което пропуска 1 стойност за всеки 2 излъчени стойности.
Накрая предаваме стойностите към mergeAll
, за да обединим всички наблюдаеми в едно.
След това трябва да видим, че всяко трето число не е излъчено.
windowTime
Операторът windowTime
връща Observable, който излъчва прозорци от елементи с периода на прозореца, зададен от windowTimeSpan
.
Необходими са до 2 аргумента, което е windowTimeSpan
, а вторият е незадължителен аргумент, който е scheduler
обект.
Пример би бил следният:
import { interval } from "rxjs"; import { windowTime, mergeAll, take, map } from "rxjs/operators"; const nums = interval(1000); const result = nums.pipe( windowTime(1000, 5000), map(win => win.pipe(take(2))), mergeAll() ); result.subscribe(x => console.log(x));
Кодът по-горе има nums
Observable, който излъчва стойности, започващи от 0 всяка секунда. След това излъчваните стойности са pipe
d към оператора windowTime
, който стартира прозорец на всеки 5 секунди с дължина 1 секунда. След това вземаме 2 стойности от всеки прозорец
Това ще доведе до пропускане на 3 стойности във всеки прозорец, тъй като стойностите се излъчват всяка минута от nums
, но ние take
само 2 стойности от всеки прозорец.
windowToggle
windowToggle
разклонява стойностите на източника Observable като вложени Observable, започвайки от излъчване от openings
и завършвайки с closingSelector
излъчвания.
Необходими са до 2 аргумента. Първият е openings
, който е наблюдаем от известия за стартиране на нов прозорец.
Вторият е closingSelector
, който приема стойността, излъчвана от openings
, и връща Observable, който излъчва сигнала next
или complete
, ще затвори прозореца.
Можем да го използваме по следния начин:
import { interval, EMPTY } from "rxjs"; import { windowToggle, mergeAll } from "rxjs/operators"; const interval$ = interval(2000); const openings = interval(2000); const result = interval$.pipe( windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)), mergeAll() ); result.subscribe(x => console.log(x));
Имаме interval$
Observable, който излъчва число на всеки 2 секунди. Тогава имаме същата наблюдаема за openings
. Излъчваните стойности за interval$
са pipe
d към оператора windowToggle
, който има openings
Observable като първи аргумент, който се излъчва на всеки 2 секунди. Така че започваме нов прозорец на всеки 2 секунди.
Тогава имаме втората функция:
i => (i % 3 === 0 ? interval(500) : EMPTY)
за затваряне на прозореца, когато въведената стойност не се дели на 3. Това означава, че получаваме всеки 3 стойности, излъчвани от interval$
, регистрирани.
прозорецКога
Операторът windowWhen
разклонява източника Observable с помощта на функцията closingSelector
, която връща Observable, за да затвори прозореца.
Взема функцията closingSelector
, която приема стойността, излъчвана от openings
, и връща Observable, която излъчва сигнала next
или complete
, за да затвори прозореца.
Например, можем да го използваме, както следва:
import { interval } from "rxjs"; import { mergeAll, take, windowWhen, map } from "rxjs/operators"; const interval$ = interval(2000); const result = interval$.pipe( windowWhen(() => interval(Math.random() * 4000)), map(win => win.pipe(take(2))), mergeAll() ); result.subscribe(x => console.log(x));
В кода по-горе имаме interval$
, който излъчва число на всеки 2 секунди. Тогава излъчената стойност е pipe
d към оператора windowWhen
, който има closingSelector
ve:
() => interval(Math.random() * 4000)
Това означава, че прозорецът ще се затвори и ще се отвори отново Math.random() * 4000
милисекунди.
Трябва да сме сигурни, че някои числа се излъчват по-бързо от други.
Операторът windowCount
разклонява стойностите на източника Observable като вложени Observable, като всяка от тях излъчва най-много windowSize
събития. windowSize
е размерът на всеки прозорец.
Операторът windowTime
връща Observable, който излъчва прозорци от елементи с периода на прозореца, зададен от windowTimeSpan
. windowTimeSpan
задава времето за отворен прозорец.
windowToggle
разклонява стойностите на източника Observable като вложени Observable, започвайки от излъчване от openings
и завършвайки с closingSelector
излъчвания. Функциите openings
и closingSelector
са Observable, които контролират отварянето и затварянето на прозореца съответно за всяка Observable.
Операторът windowWhen
разклонява източника Observable с помощта на функцията closingSelector
, която връща Observable, за да затвори прозореца.