RxJS е библиотека за реактивно програмиране. Операторите за създаване са полезни за генериране на данни от различни източници на данни, за които да се абонират наблюдателите.

В тази статия ще разгледаме някои прозоречни оператори, включително операторите windowCount, windowTime, windowToggle и windowWhen.

windowCount

Операторът windowCount разклонява стойностите на източника Observable като вложена Observable, като всяка от тях излъчва най-много windowSize събития.

Необходими са до 2 аргумента. Първият аргумент е windowSize, което е максималният брой стойности, които трябва да бъдат излъчвани от всеки прозорец.

Вторият аргумент не е задължителен. Това е числото startWindowEvery, което по подразбиране е 0. Това е интервалът за стартиране на нов прозорец. Интервалът се измерва с броя елементи, излъчвани от източника Observable.

Например, можем да го използваме, както следва:

import { interval } from "rxjs";
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));
const result = nums.pipe(
  windowCount(3, 3),
  map(win => win.pipe(skip(1))),
  mergeAll()
);
result.subscribe(x => console.log(x));

Кодът по-горе има nums Observable, който излъчва число всяка секунда до 1000.

Това е pipe d към оператора windowCount, който излъчва 3 стойности наведнъж и започва нов прозорец след 3 стойности, излъчени от nums.

Тогава това е map ped към win.pipe(skip(1)) Observable, което пропуска 1 стойност за всеки 2 излъчени стойности.

Накрая предаваме стойностите към mergeAll, за да обединим всички наблюдаеми в едно.

След това трябва да видим, че всяко трето число не е излъчено.

windowTime

Операторът windowTime връща Observable, който излъчва прозорци от елементи с периода на прозореца, зададен от windowTimeSpan.

Необходими са до 2 аргумента, което е windowTimeSpan, а вторият е незадължителен аргумент, който е scheduler обект.

Пример би бил следният:

import { interval } from "rxjs";
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);
const result = nums.pipe(
  windowTime(1000, 5000),
  map(win => win.pipe(take(2))),
  mergeAll()
);
result.subscribe(x => console.log(x));

Кодът по-горе има nums Observable, който излъчва стойности, започващи от 0 всяка секунда. След това излъчваните стойности са pipe d към оператора windowTime, който стартира прозорец на всеки 5 секунди с дължина 1 секунда. След това вземаме 2 стойности от всеки прозорец

Това ще доведе до пропускане на 3 стойности във всеки прозорец, тъй като стойностите се излъчват всяка минута от nums, но ние take само 2 стойности от всеки прозорец.

windowToggle

windowToggle разклонява стойностите на източника Observable като вложени Observable, започвайки от излъчване от openings и завършвайки с closingSelector излъчвания.

Необходими са до 2 аргумента. Първият е openings, който е наблюдаем от известия за стартиране на нов прозорец.

Вторият е closingSelector, който приема стойността, излъчвана от openings, и връща Observable, който излъчва сигнала next или complete, ще затвори прозореца.

Можем да го използваме по следния начин:

import { interval, EMPTY } from "rxjs";
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);
const openings = interval(2000);
const result = interval$.pipe(
  windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),
  mergeAll()
);
result.subscribe(x => console.log(x));

Имаме interval$ Observable, който излъчва число на всеки 2 секунди. Тогава имаме същата наблюдаема за openings. Излъчваните стойности за interval$ са pipe d към оператора windowToggle, който има openings Observable като първи аргумент, който се излъчва на всеки 2 секунди. Така че започваме нов прозорец на всеки 2 секунди.

Тогава имаме втората функция:

i => (i % 3 === 0 ? interval(500) : EMPTY)

за затваряне на прозореца, когато въведената стойност не се дели на 3. Това означава, че получаваме всеки 3 стойности, излъчвани от interval$, регистрирани.

прозорецКога

Операторът windowWhen разклонява източника Observable с помощта на функцията closingSelector, която връща Observable, за да затвори прозореца.

Взема функцията closingSelector, която приема стойността, излъчвана от openings, и връща Observable, която излъчва сигнала next или complete, за да затвори прозореца.

Например, можем да го използваме, както следва:

import { interval } from "rxjs";
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);
const result = interval$.pipe(
  windowWhen(() => interval(Math.random() * 4000)),
  map(win => win.pipe(take(2))),
  mergeAll()
);
result.subscribe(x => console.log(x));

В кода по-горе имаме interval$, който излъчва число на всеки 2 секунди. Тогава излъчената стойност е pipe d към оператора windowWhen, който има closingSelector ve:

() => interval(Math.random() * 4000)

Това означава, че прозорецът ще се затвори и ще се отвори отново Math.random() * 4000 милисекунди.

Трябва да сме сигурни, че някои числа се излъчват по-бързо от други.

Операторът windowCount разклонява стойностите на източника Observable като вложени Observable, като всяка от тях излъчва най-много windowSize събития. windowSize е размерът на всеки прозорец.

Операторът windowTime връща Observable, който излъчва прозорци от елементи с периода на прозореца, зададен от windowTimeSpan. windowTimeSpan задава времето за отворен прозорец.

windowToggle разклонява стойностите на източника Observable като вложени Observable, започвайки от излъчване от openings и завършвайки с closingSelector излъчвания. Функциите openings и closingSelector са Observable, които контролират отварянето и затварянето на прозореца съответно за всяка Observable.

Операторът windowWhen разклонява източника Observable с помощта на функцията closingSelector, която връща Observable, за да затвори прозореца.