Вопросы по теме 'akka-stream'

Как можно непрерывно материализовать потоки Akka?
Я использую Akka Streams в Scala для опроса из очереди AWS SQS с помощью AWS Java SDK . Я создал ActorPublisher , который удаляет сообщения из очереди через двухсекундный интервал: class SQSSubscriber(name: String) extends...
1551 просмотров
schedule 16.04.2024

Как создать Source и вставлять в него элементы вручную?
Я хочу создать пользовательскую StatefulStage , который должен работать как метод groupBy и испускать элементы Source[A, Unit] , но я не понимаю, как создать экземпляр Source[A, Unit] и передать ему входящий элемент. Вот заглушка: class...
433 просмотров
schedule 28.05.2024

В akka-http должны ли объекты ответа потребляться в приемнике, если ответ отличается от 200?
На этом http://doc.akka.io/docs/akka/2.4.9/scala/http/client-side/request-level.html#Future-Based_Variant внизу есть предупреждение: Обязательно используйте объекты ответа dataBytes:Source[ByteString,Unit] , например, подключив его к...
1486 просмотров
schedule 11.12.2022

Использование Akka Http для нескольких привязок
Использовать akka http для привязки к порту, а затем маршрутизировать входящие соединения достаточно просто, учитывая документация . Один нерешенный вопрос заключается в том, как связать несколько портов для разных маршрутов. Если у меня есть...
1047 просмотров
schedule 06.06.2024

Akka HTTP Websocket, как идентифицировать соединения внутри актора
Я работаю над простой многопользовательской игрой в scala, которую я хотел бы открыть через веб-сокеты для клиентов JS. Вот мой класс WebsocketServer class WebsocketServer(actorRef: ActorRef, protocol: Protocol, system: ActorSystem,...
1409 просмотров
schedule 06.11.2023

Как при работе с потоками Akka группировать по строкам, содержащим разные значения?
Немного поясню свой вопрос: Вот мой код def averagePerAttributeFlow[T] = Flow[T] .groupBy(10000, { case User(id, location, age) => location match { case Location(city,state,country) => println("MATCHING BY CITY")...
175 просмотров
schedule 23.05.2024

Что означают параметры типа Source‹Out,Mat›?
Я пытаюсь понять тип источника для потоков Akka, указанный здесь . К сожалению, документация и примеры, которые я нашел, не объясняют, что на самом деле означает каждый из параметров типа. Я предполагаю, что Out - это тип, который испускает...
315 просмотров
schedule 14.12.2023

Поток Akka Streams для обработки результатов с разбивкой на страницы не завершается
Я хотел бы реализовать поток для обработки результатов с разбивкой на страницы (например, базовая служба возвращает некоторые результаты, но также указывает, что доступны дополнительные результаты, делая другой запрос, передавая, например, курсор)....
997 просмотров
schedule 07.04.2024

Отправка метаданных вместе с потоком Akka
Вот мой предыдущий вопрос: Отправлять данные из InputStream через поток Akka/Spring Мне удалось отправить сжатый и зашифрованный файл через поток Akka. Теперь я ищу способ транспортировки метаданных вместе с данными, в основном с именем файла и...
265 просмотров
schedule 01.01.2024

Поток Akka: обработка CSV с заголовком
Чтение файлов CSV с использованием Akka Streams - на основе этого вопроса. У меня есть чтение CSV с использованием Akka Streams. Теперь мне нужно обработать его построчно, но мне также нужно знать, как назывались заголовки. Есть варианты? УПД....
1557 просмотров
schedule 25.04.2024

Akka HTTP с использованием демаршаллера ответа
Я создаю конвейер данных, используя потоки Akka и Akka HTTP. Вариант использования довольно прост: получить веб-запрос от пользователя, который сделает две вещи. Сначала создайте сеанс, вызвав сторонний API, во-вторых, зафиксируйте этот сеанс в...
545 просмотров
schedule 28.09.2022

Выведите два варианта будущего, когда они завершатся в Play 2.5.
Я смиренно возвращаюсь к сообществу, поскольку сейчас я явно слишком глубоко увяз. Итак, я пытаюсь вернуть два фьючерса (в Play 2.5.2 в Scala) на экран в виде модулей, которые появляются на экране после их завершения. Я пробовал довольно много...
51 просмотров
schedule 05.03.2024

Как сохранить материализованную ценность в пользовательском приемнике?
Не могли бы вы рассказать мне, как сохранить материализованный Future[Int] в таком примере? val test: Sink[Int, NotUsed] = MergeHub.source[Int].grouped(100).to(Sink.fold(0L) { case (count, items) => count + items.sum }).run() Я хотел...
172 просмотров
schedule 18.02.2024

Как работает дроссельная заслонка akka stream?
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] Означает ли maximumBurst количество элементов, которые могут быть обработаны одновременно?
3070 просмотров
schedule 10.11.2022

Что означает материализованное значение и почему оно отличается в приведенной ниже логике для одного и того же графика?
Я новичок в Akka Stream и только на прошлой неделе начал читать его документы. Я могу понять большинство концепций, но мне трудно понять, что означает Materialized Value в потоке Akka и каково его значение? Если бы кто-нибудь мог объяснить мне...
2669 просмотров
schedule 02.11.2023

Группировка элементов в Scala/Akka Streams
Предположим, у меня есть источник разных фруктов, и я хочу вставить их количество в базу данных. Я могу сделать что-то вроде этого: Flow[Fruits] .map { item => insertItemToDatabase(item) } Но это явно медленно — зачем вставлять в...
1477 просмотров
schedule 26.09.2022

Передача вывода потока в вещание в Akka Streams Graph
Я пытаюсь написать график Akka Stream. Код, который я написал, val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder => (sink1, sink2) => import GraphDSL.Implicits._ val bcast =...
340 просмотров
schedule 11.01.2024

Потоковый тест Akka с помощью TestKit и ScaleTest
Я получаю NullPointerException при тестировании своего потокового приложения akka с помощью scalatest и не понимаю, почему ... Я, вероятно, что-то пропустил в Akka Streams, я просто ныряю в это. Я использую общую структуру кода для масштабирования...
1622 просмотров

Akka Distributed Pub/Sub противодавление
Я использую Akka Distributed Pub/Sub и имею одного издателя и подписчика. Мой издатель намного быстрее, чем подписчик. Есть ли способ замедлить работу издателя после определенного момента? Код издателя: public class Publisher extends...
771 просмотров

Преобразование данных Slick Streaming и отправка фрагментированного ответа с помощью Akka Http
Цель состоит в том, чтобы выполнить потоковую передачу данных из базы данных, выполнить некоторые вычисления для этого фрагмента данных (это вычисление возвращает Future некоторого класса case) и отправить эти данные в виде ответа по фрагментам...
700 просмотров
schedule 11.04.2024