Приложения часто разрабатываются с задействованным механизмом регулирования. Иногда мы хотим ограничить количество запросов, чтобы повысить безопасность и производительность нашего приложения. Иногда ваше приложение не может отвечать более чем на определенное количество подключений. Например, если вы как можно быстрее опросили сообщение из очереди и не ограничили количество подключений, ваше приложение скоро исчерпает свои ошибки подключения и столкнется с ошибкой отказа в обслуживании. В этом сценарии наличие механизма, ограничивающего количество одновременно обрабатываемых приложений, поможет повысить производительность вашего приложения.

В этой статье я хочу поделиться тем, как мы можем создать механизм регулирования с помощью параллельной примитивной очереди FS2.

Прежде чем мы начнем, я хочу, чтобы вы представили приложение, которое будет непрерывно опрашивать элементы из апстрима. Затем мы используем FS2 для управления дросселированием / противодавлением, обеспечивая максимальный размер буфера, с которым ресурсы могут работать одновременно. Когда внутренняя очередь заполнена, сообщение не будет помещено в очередь до тех пор, пока некоторые задачи не будут выполнены.

По сути, пользователь приложения может использовать его так:

Поэтому, когда максимальный размер превышает 100, он прекратит опрос элемента до тех пор, пока во внутренней очереди не появится некоторое пространство.

Процесс создания состоит из двух частей:

  • Потребитель - это тип класса, который будет подписываться на восходящий поток, постоянно помещая значение во внутреннюю очередь.
  • Подписчик - это класс типа, который будет обертывать Потребителя и извлекать фрагмент из внутренней Очереди и обрабатывать это значение.

В зависимости от варианта использования приложения мы можем инкапсулировать часть подписчика или часть потребителя. В этой статье это будет Потребитель. Это означает, что пользователь может указать, из какого восходящего потока их функция хочет опросить, и они могут получить доступ к результату от подписчика. Последняя статья инкапсулирует подписную часть.

Потребитель

Потребитель подпишется на восходящий поток. Следовательно, нам нужна такая функция:

Функция получает любое значение из upstream и enqueue дела во внутреннюю очередь.

Создадим начальный метод подписки. Нам нужно создать экземпляр класса типа Consumer, позволив вызывающей стороне инициализировать внутреннюю очередь и внедрить их в экземпляр Consumer.

Мы будем использовать NoneTerminatedQueue для завершения очереди после того, как остановки восходящего потока отправят сообщение потребителю. Потребитель может сказать подписчику остановить поток.

Это похоже на приобретение ресурсов?

Ты прав! По сути, мы хотим получить ресурс и хотим гарантировать, что какое-то действие по очистке будет запущено, если ресурс будет получен. Поэтому мы создадим вспомогательный метод ресурса для subscribe:

Мы получим значение от upStream и enqueue1 в нашу внутреннюю очередь. Затем мы compile.drain и истощим весь ввод, поступающий от upStream. Если вся информация является утечкой или во время вычислений возникают какие-либо ошибки, ресурс будет очищен enqueue1 a None в нашу внутреннюю очередь (подписчик). Затем подписчик останавливает свой поток.

Вот как мы называем Consumer:

start здесь начнется fiber. Если вы не поставили start, тогда весь процесс будет последовательным, то есть он поставит все значение в очередь, а затем исключит из очереди. Если наша внутренняя очередь заполнена, она там зависнет. Следовательно, наличие start будет выполнять subscribe в другом потоке ввода-вывода.

Подписчик

Мы хотим, чтобы Subscriber повторно опрашивал и возвращал Stream[F, A] обратно вызывающему.

Следовательно, мы можем создать класс типа с волей pollRepeat:

Подобно Consumer, нам нужно будет создать экземпляр Subscriber, указав максимальный размер очереди и восходящий поток в качестве параметра:

Нам нужно подписаться на восходящий поток, запустить подписчика в другой поток и создать экземпляр подписчика.

Мы создаем boundedNoneTerminated внутреннюю очередь с maxBufferSize, которую предоставляет вызывающий. Затем мы создаем наш Consumer с помощью класса Consumer, подписываемся на восходящий поток и начинаем с другого волокна. Мы возвращаем очередь обратно, чтобы мы могли связать ее с экземпляром Subscriber.

Затем, когда мы создаем consumer, мы можем связать потребителя с Subscriber. Нравится:

Это программа для использования экземпляра подписчика и его использования в качестве любого механизма регулирования в вашем приложении:

В конце вам нужно будет позвонить unsafeRunSync:

subscriberExample.unsafeRunSync

Заключение

Добавление механизма регулирования может быть сложной задачей, особенно если вам нужно сделать это в параллельной среде. К счастью, с помощью FS2 создание механизма регулирования для любого приложения можно выполнить с помощью нескольких строк кода.

Мы создаем класс типа Consumer для подписки на любой источник. Затем мы используем Subscriber, чтобы постоянно enqueue и dequeue одновременно. Мы можем гарантировать очистку данных очереди перед остановкой потока с получением ресурсов.

Надеюсь, вы найдете этот пост полезным для получения дополнительной информации о FS2, Scala или функциональном программировании в целом. Если есть что-то, что может вызвать какую-либо ошибку, не стесняйтесь указать на это, чтобы я также мог поучиться у вас.

Весь исходный код находится в GitHub.

Спасибо за внимание! Если вам понравился этот пост, не стесняйтесь подписаться на мою рассылку, чтобы получать уведомления об эссе о карьере в технологиях, интересных ссылках и контенте!

Вы можете подписаться на меня и подписаться на меня на Medium, чтобы увидеть больше подобных сообщений.

Первоначально опубликовано на https://edward-huang.com.