Java 8 Streams бяха изключително добре проектирани. Самият API предлага кратък (учтив начин да се каже разочароващо ограничен) набор от функции. Може би това е така, защото екипът зад него беше фокусиран върху добавянето на паралелизъм на данните, което може би ограничи техните възможности по отношение на функциите за последователно поточно предаване.

Оказва се обаче, че тази сбитост е предимство, а не слабост. Чрез създаването на прост API, който прави едно нещо наистина добре (това е да представлява напълно мързелив, потенциално безкраен поток от данни), те създадоха нещо, което също беше безкрайно разширяемо от общността.

В тази статия се надявам да ви покажа 10 важни функции, които не са включени в Stream API, но също и 10 точки за разширение, които ви предлагат тези функции. Ако сте разочаровани от многословността на основния API при решаването на конкретен проблем, не се страхувайте - много вероятно има разширен API, който ще направи решението много по-просто и по-елегантно (ще използваме примери от cyclops-react Streaming API).

Повечето разработчици изглежда използват потоци просто за трансформиране на колекции, но те са способни на много повече..

Напълно мързелив тръбопровод за обработка на потенциално безкраен поток от данни е невероятно мощна структура от данни и вижте колко мощна е, нека направим това, което всички изглежда правят в блоговете в наши дни – нека изградим микроуслуга.

За какво се нуждаем от нашата микроуслуга

  1. Четене на данни от услугата за съхранение S3 на Amazon
  2. Преобразувайте го в друга форма
  3. Запишете трансформираните данни обратно в AWS S3

Можем да започнем с дефиниране на поток по този начин..

Сега имаме поток, който ще работи безкрайно и непрекъснато, но има много случаи, когато това не е това, което искаме. Искаме ли да удряме нашата услуга за данни непрекъснато или да използваме някаква функция, базирана на времето, за да контролираме колко често се извършват заявки?

Нека да планираме потока да се изпълнява на всеки 60 секунди. Можем да използваме StreamUtils в cyclops-react, за да направим това

Липсваща функция 1: Планиране

Други полезни оператори тук включват onePer, xPer и scheduling въз основа на cron изрази.

Проблем, с който обаче бързо ще се сблъскаме, е поведението, когато възникне изключение. Подписът на метода на pollS3 вероятно изглежда по следния начин

public Data pollS3() throws AmazonServiceException

Ако поток хвърли изключение, изпълнението на целия поток е неуспешно и в този случай нашият планиран поток ще спре да се изпълнява.

Липсваща функция 2: Възстановяване на грешка!

Отново можем да използваме оператора за възстановяване в StreamUtils, той ни позволява да хванем изключение в поток и да го обработим.

Използването на статични методи за разширение скоро ще стане нечетливо (освен ако не използвате макрос на метода за разширение на Lombok — който може да работи добре в Eclipse, но не толкова добре в други IDE). По-добра алтернатива е да се използва специален клас за разширение. В cyclops-react създадохме такъв, наречен ReactiveSeq.

Обърнете внимание, че повечето от операторите, обсъдени в тази статия, са достъпни за директна употреба в JDK 8 Streams (дори тези, които използват функционалност за реактивни потоци в класа StreamUtils).

ReactiveSeq разширява невероятно мощния тип Seq от jooλ. В този случай Seq е съкращение от Sequential. Чрез изоставянето на паралелизма на данни на Java 8 Streams е възможно да се изгради супер мощен Sequential Streaming API. Seq на jooλ въвежда много много мощни методи за разширение в допълнение към 10-те липсващи функции, които разглеждаме днес в тази статия (които са предимно добавки от cyclops-react)!

jooλ прави манипулирането на данни в паметта с потоци много по-лесно и много от неговите по-усъвършенствани функции са вдъхновени от по-мощните оператори, налични в SQL (може би не е изненадващо, тъй като идва от Data Geekery, хората зад Jooq).

ReactiveSeq разширява Seq и също така внедрява API за реактивни потоци. Целта е да добавим допълнителни разширения, за да можем да работим с данни, които може да пристигнат асинхронно, да прилагаме функции, които могат да успеят или да се провалят към тези данни, да свързваме безпроблемно различни потоци на обработка, да се справяме с несъответствията при обработката между производителите на данни и потребителите, както и да предоставяме алтернативен механизъм за паралелизъм на данните, като същевременно запазва същия богат набор от оператори.

Липсваща функция 3: Повторни опити

Вече видяхме оператора за възстановяване, който ни позволява да прихващаме и Exception и да се възстановяваме от него. В нашата Microservice, след като изтеглим и обработим данните — би било наистина жалко да ги изпуснем, само защото не можахме да ги запазим на последния етап.

Можем да се уверим, че правим повторен опит при неуспех, като използваме оператора за повторен опит.

Обърнете внимание, че можем да използваме по-подробен оператор за претоварен повторен опит, за да посочим параметрите за отлагане и общия брой повторни опити, които бихме искали.

Липсваща функция 4: Събития с грешки

API за реактивни потоци позволява на потребителите да слушат различни събития, генерирани по време на обработката на събитието — като потока от данни на елемента или потока от грешки и да получават събитие за завършване. Със стандартните потоци на Java 8 получаваме само потока от данни на елемента — всяка грешка е причинила прекратяването на потока. С ReactiveSeq можем да използваме forEachWithError, за да се абонираме както за събития с данни, така и за грешки (вижте StreamUtils#forEachWithError, за да се абонирате за тези събития със стандартни JDK 8 потоци).

Липсваща функция 5: Горещи потоци

Горещите потоци са потоци, които се изпълняват в момента, към които можем да се свържем (това прави операторът за свързване!). scheduleFixedDelay създава HotStream, на всеки 60 секунди в нашия пример Stream ще излъчва данни и ние можем да слушаме тези емисии, като свързваме други потоци (колкото пожелаем!). По време на тежка обработка в дадена услуга може да поискаме да обработим допълнително малка извадка от записи, може би за да регистрираме някои полезни показатели или да захраним процес надолу по веригата. Горещите потоци са идеални за това.

В този пример задействаме HotSteam, изпълняващ се на отделна нишка чрез оператора hotStream, и създаваме нов свързан поток чрез connect().

Операторът за debounce казва, че искаме да обработим само x количество излъчени елементи в рамките на определения период от време. В този случай ще отпечатаме по една точка от данни на ден.

Операторът hotStream ни позволява да създадем асинхронно изпълняващ се поток — но това не е единственият начин да направим това.

Липсваща функция 6: Асинхронно изпълнение

Операторът futureOperations на ReactiveSeq позволява на потребителите да посочат Executor (може би ExecutorService, който управлява Thread Pool), на който потокът ще бъде изпълнен. След като бъде извикана терминална операция, потребителят получава CompletableFuture като резултат. Текущата нишка може да продължи да работи безпрепятствено.

С по-мощен API за последователно поточно предаване имаме алтернативен механизъм за паралелно поточно предаване. За задачи, обвързани с процесора, можем да разпространяваме последователни потоци между нишки (в идеалния случай по един на ядро ​​на процесора)

С ReactiveSeq можем да дефинираме поток, който се изпълнява асинхронно така.

И може да го разпредели между нишки в for цикъл

Ние не сме ограничени до изпълнение на нашите потоци асинхронно в пул от нишки, можем също да ги предадем на цикъл на събитие Vert.x за изпълнение (стига да можем да се съобразим с дефиницията на интерфейса java.util.Executor, ние сме добри).

Липсваща функция 8: Мързеливи терминални операции

Докато по-голямата част от тръбопровода на Java 8 Stream е дефиниран мързеливо, терминалните операции обикновено не са. Всъщност те са спусъкът за изпълнение на тръбопровода. Има случаи, обикновено когато конвейерът Stream изпълнява някаква скъпа функция, където би било по-добре да задействаме оценката, след като знаем, че данните са необходими.

Тъй като потоците се преминават веднъж, може да не е добра идея да предавате самия поток като тип връщане на метод или да бъде обвит в доставчик. Операторът lazyOperations в cyclops-react ни позволява да дефинираме лениво изпълнявани терминални операции. Върнатият тип е мемоизирано (кеширано) Eval, което представлява мързелива оценка.

Липсваща функция 9: Прехвърляне на данни в потоци

Потоците на Java 8 са страхотни за работа с данни, които вече имаме, но биха били много по-полезни, ако имаше начин, по който можем асинхронно да им доставяме данни. В cyclops-react създадохме StreamSource точно за тази цел. StreamSource ни позволява да изпращаме данни в поток на отделни нишки, произвеждащи данни от нишката, върху която се изпълнява потокът.

Можем да използваме StreamSource, за да генерираме и управляваме JDK потоци, ReactiveSeq и дори мощни FutureStreams (вижте Липсваща функция 10) и много подобно на услугите за разпределени съобщения StreamSource може да се държи като опашка от информация (избутване на данни към точно един поток) или като тема (избутване на данни към множество свързани слушащи потоци).

Липсваща характеристика 10: Мощен паралелизъм

Java 8 Streams предлага намален набор от оператори, за да поддържа паралелизъм на данните и ние видяхме, че е възможно да се изгради много богат и мощен последователен API. Има и много начини, по които можем да изградим паралелни потоци и след като изградихме страхотно мощен последователен API, би било страхотно да можем да запазим тази мощност, но в паралелен поток. В cyclop-react въвеждаме концепцията за FutureStream.

FutureStreams е API, който управлява поток от бъдещи задачи. Нашият тип LazyFutureStream разширява ReactiveSeq и така има всички същите оператори (и дори добавя някои), но слоеве в паралелизъм отгоре на това.

Изграждането на поток от фючърси е доста лесно за начало, но скоро може да стане неудобно. В нашите методи map и flatMap можем просто да делегираме на еквивалентния метод на Future, който управляваме, но нещата започват да стават по-трудни, когато се опитаме да приложим филтриране (без значение някои от по-мощните оператори, налични в ReactiveSeq).

Можем да дефинираме FutureStream, сякаш е нормален JDK поток (с много допълнителни мощни оператори)

и го накарайте да се изпълнява паралелно.

Резюме

Можем да продължим и да добавим още липсващи функции, не сме споменали групиране/плъзгащи се прозорци, покрито вмъкване и изтриване, разширени оператори за вземане/пускане, компресиране или абонаменти и много други. Но се надявам, че успях да се убедя, че дизайнът на Stream API, макар и ограничен, отваря свят от възможности за разширение, с които библиотеки като „cyclops-react“, разширяващи „jooλ“ и независимо „StreamEx“, са се справили.

В cyclops-react ние задълбочихме обработката на потока, защото ни позволява да използваме API и стил на кодиране, познати на разработчиците на Java 8, по-последователно в нашите приложения. Функционалният стил, въведен в Stream API, обикновено води до приложения, които са по-лесни за тестване, с по-малко движещи се части, които по своята същност са по-успоредни.

Създадохме структури от данни като async.Queues и Topics специално, за да помогнем на plumb Streams заедно да реализират тази цел.

Нещо, което може да изглежда сравнително просто в сравнение с други реализации на Stream, в крайна сметка се оказва нещо невероятно полезно и мощно.

За повече информация разгледайте cyclops-react.io и ръководството за потребителя