Приложенията често са проектирани с включен дроселиращ механизъм. Понякога искаме да ограничим броя на заявките, за да подобрим сигурността и производителността на нашето приложение. Понякога вашето приложение не може да отговори на повече от определен брой връзки. Например, ако сте поискали съобщението възможно най-бързо от опашката и не сте ограничили броя на връзките, вашето приложение скоро ще изчерпи своя глупак за връзка и ще се сблъска с грешка при отказ на услуга. В този сценарий наличието на механизъм, който ограничава броя на приложенията, които се обработват едновременно, ще помогне за подобряване на производителността на вашето приложение.

В тази статия искам да споделя как можем да създадем дроселиращ механизъм с паралелна примитивна опашка на FS2.

Преди да започнем, искам да си представите приложение, което непрекъснато ще анкетира елементи отгоре. След това използваме FS2, за да се справим с дроселиране/обратно налягане, като предоставяме максималния размер на буфера, с който ресурсите могат да работят едновременно. Когато вътрешната опашка е пълна, тя няма да постави съобщението в опашка, докато някои от задачите не бъдат завършени.

По същество потребителят на приложението може да го използва по следния начин:

Следователно, когато максималният размер надвиши 100, той ще спре запитването на елемента, докато вътрешната опашка има малко място.

Има 2 части в процеса на създаване на това:

  • Потребителят е тип клас, който ще се абонира за upstream чрез постоянно поставяне на стойността във вътрешна опашка.
  • Абонатът е клас тип, който ще обвие потребителя и ще извади частта от вътрешната опашка и ще обработи тази стойност.

В зависимост от случая на използване на приложението, можем да капсулираме частта за абонат или частта за потребител. В тази статия това ще бъде Потребителят. Това означава, че потребителят може да посочи от какво нагоре по веригата неговата функция иска да анкетира и те получават достъп до резултата от абоната. „Последната“ статия капсулира частта за абонатите.

Консуматор

Потребителят ще се абонира за upstream. Следователно, ние искаме функция, която е като тази:

Функцията получава всяка стойност от upstream и enqueue въпроса към вътрешната опашка.

Нека създадем първоначалния метод за абонамент. Трябва да създадем екземпляра на класа тип Consumer, като позволим на нашия извикващ да инициализира вътрешната опашка и да ги инжектира в екземпляра Consumer.

Ще използваме NoneTerminatedQueue за прекратяване на опашката, след като спирките нагоре по веригата изпратят съобщение до потребителя. Потребителят може да каже на абоната да спре потока.

Това звучи ли като придобиване на ресурси?

Ти си прав! По същество искаме да придобием ресурс и искаме да гарантираме, че ще се изпълни някакво действие за почистване, ако ресурсът бъде придобит. Следователно ще създадем помощния метод на ресурсите за subscribe:

Ще получим стойността от upStream и enqueue1 в нашата вътрешна опашка. След това ще compile.drain и ще източим целия вход, идващ от upStream. Ако цялата информация е изтощена или възникнат грешки по време на изчислението, ресурсът ще се изчисти от enqueue1 до None към нашата вътрешна опашка (Абонат). След това абонатът спира своя поток.

Ето как наричаме Consumer:

start тук ще започне fiber. Ако не сте поставили start, тогава целият процес ще бъде последователен, което означава, че ще постави цялата стойност в опашка, след което ще я премахне от опашката. Ако нашата вътрешна опашка е пълна, тя ще виси там. Следователно наличието на start ще изпълни subscribe в друга IO нишка.

Абонат

Искаме Subscriber да проучва многократно и да връща Stream[F, A] обратно на повикващия.

Следователно можем да създадем клас тип, който има will pollRepeat:

Подобно на Consumer ще трябва да създадем екземпляр на Subscriber, като имаме максималния размер на опашката и upstream като параметър:

Трябва да се абонираме за upstream, да изстреляме абоната в друга нишка и да инстанцираме абоната.

Създаваме вътрешната опашка boundedNoneTerminated с maxBufferSize, предоставен от повикващия. След това правим нашия Consumer с помощта на класа тип Consumer и се абонираме за upstream и започваме с друго влакно. Връщаме опашката обратно, за да можем да я свържем с екземпляра Subscriber.

Тогава, докато създаваме consumer, можем да свържем потребителя с Subscriber. Като този:

Това е програмата за използване на екземпляра на абоната и за използването му като всеки дроселиращ механизъм във вашето приложение:

Ще трябва да се обадите на unsafeRunSync в края:

subscriberExample.unsafeRunSync

Заключение

Добавянето на дроселиращ механизъм може да бъде предизвикателство, особено ако трябва да го направите в едновременна среда. За щастие, с помощта на FS2, конструирането на дроселиращ механизъм за всяко приложение може да се направи в няколко реда кодове.

Създаваме клас тип Consumer, за да се абонирате за всеки източник. След това използваме Subscriber за постоянно enqueue и dequeue по едновременен начин. Можем да гарантираме почистването на данните от опашката, преди да спрем потока с придобиване на ресурси.

Надявам се да намерите тази публикация за полезна, за да научите повече за FS2, Scala или функционалното програмиране като цяло. Ако има нещо, което може да причини някаква грешка, не се колебайте да го посочите, за да мога и аз да се уча от вас.

Целият изходен код е в GitHub.

Благодаря, че прочетохте! Ако тази публикация ви харесва, не се колебайте да се абонирате за моя бюлетин, за да получавате известия за есета за кариера в технологиите, интересни връзки и съдържание!

Можете да ме следвате и в Medium за повече публикации като тази.

Първоначално публикувано на https://edward-huang.com.