Вопросы по теме 'akka-stream'
Как можно непрерывно материализовать потоки Akka?
Я использую Akka Streams в Scala для опроса из очереди AWS SQS с помощью AWS Java SDK . Я создал ActorPublisher , который удаляет сообщения из очереди через двухсекундный интервал:
class SQSSubscriber(name: String) extends...
1551 просмотров
schedule
16.04.2024
Как создать Source и вставлять в него элементы вручную?
Я хочу создать пользовательскую StatefulStage , который должен работать как метод groupBy и испускать элементы Source[A, Unit] , но я не понимаю, как создать экземпляр Source[A, Unit] и передать ему входящий элемент. Вот заглушка:
class...
433 просмотров
schedule
28.05.2024
В akka-http должны ли объекты ответа потребляться в приемнике, если ответ отличается от 200?
На этом http://doc.akka.io/docs/akka/2.4.9/scala/http/client-side/request-level.html#Future-Based_Variant внизу есть предупреждение:
Обязательно используйте объекты ответа dataBytes:Source[ByteString,Unit] , например, подключив его к...
1486 просмотров
schedule
11.12.2022
Использование Akka Http для нескольких привязок
Использовать akka http для привязки к порту, а затем маршрутизировать входящие соединения достаточно просто, учитывая документация .
Один нерешенный вопрос заключается в том, как связать несколько портов для разных маршрутов. Если у меня есть...
1047 просмотров
schedule
06.06.2024
Akka HTTP Websocket, как идентифицировать соединения внутри актора
Я работаю над простой многопользовательской игрой в scala, которую я хотел бы открыть через веб-сокеты для клиентов JS.
Вот мой класс WebsocketServer
class WebsocketServer(actorRef: ActorRef, protocol: Protocol, system: ActorSystem,...
1409 просмотров
schedule
06.11.2023
Как при работе с потоками Akka группировать по строкам, содержащим разные значения?
Немного поясню свой вопрос:
Вот мой код
def averagePerAttributeFlow[T] = Flow[T]
.groupBy(10000, {
case User(id, location, age) =>
location match {
case Location(city,state,country) =>
println("MATCHING BY CITY")...
175 просмотров
schedule
23.05.2024
Что означают параметры типа Source‹Out,Mat›?
Я пытаюсь понять тип источника для потоков Akka, указанный здесь .
К сожалению, документация и примеры, которые я нашел, не объясняют, что на самом деле означает каждый из параметров типа. Я предполагаю, что Out - это тип, который испускает...
315 просмотров
schedule
14.12.2023
Поток Akka Streams для обработки результатов с разбивкой на страницы не завершается
Я хотел бы реализовать поток для обработки результатов с разбивкой на страницы (например, базовая служба возвращает некоторые результаты, но также указывает, что доступны дополнительные результаты, делая другой запрос, передавая, например, курсор)....
997 просмотров
schedule
07.04.2024
Отправка метаданных вместе с потоком Akka
Вот мой предыдущий вопрос: Отправлять данные из InputStream через поток Akka/Spring
Мне удалось отправить сжатый и зашифрованный файл через поток Akka. Теперь я ищу способ транспортировки метаданных вместе с данными, в основном с именем файла и...
265 просмотров
schedule
01.01.2024
Поток Akka: обработка CSV с заголовком
Чтение файлов CSV с использованием Akka Streams - на основе этого вопроса. У меня есть чтение CSV с использованием Akka Streams. Теперь мне нужно обработать его построчно, но мне также нужно знать, как назывались заголовки. Есть варианты?
УПД....
1557 просмотров
schedule
25.04.2024
Akka HTTP с использованием демаршаллера ответа
Я создаю конвейер данных, используя потоки Akka и Akka HTTP. Вариант использования довольно прост: получить веб-запрос от пользователя, который сделает две вещи. Сначала создайте сеанс, вызвав сторонний API, во-вторых, зафиксируйте этот сеанс в...
545 просмотров
schedule
28.09.2022
Выведите два варианта будущего, когда они завершатся в Play 2.5.
Я смиренно возвращаюсь к сообществу, поскольку сейчас я явно слишком глубоко увяз.
Итак, я пытаюсь вернуть два фьючерса (в Play 2.5.2 в Scala) на экран в виде модулей, которые появляются на экране после их завершения. Я пробовал довольно много...
51 просмотров
schedule
05.03.2024
Как сохранить материализованную ценность в пользовательском приемнике?
Не могли бы вы рассказать мне, как сохранить материализованный Future[Int] в таком примере?
val test: Sink[Int, NotUsed] =
MergeHub.source[Int].grouped(100).to(Sink.fold(0L) { case (count, items) => count + items.sum }).run()
Я хотел...
172 просмотров
schedule
18.02.2024
Как работает дроссельная заслонка akka stream?
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]
Означает ли maximumBurst количество элементов, которые могут быть обработаны одновременно?
3070 просмотров
schedule
10.11.2022
Что означает материализованное значение и почему оно отличается в приведенной ниже логике для одного и того же графика?
Я новичок в Akka Stream и только на прошлой неделе начал читать его документы. Я могу понять большинство концепций, но мне трудно понять, что означает Materialized Value в потоке Akka и каково его значение?
Если бы кто-нибудь мог объяснить мне...
2669 просмотров
schedule
02.11.2023
Группировка элементов в Scala/Akka Streams
Предположим, у меня есть источник разных фруктов, и я хочу вставить их количество в базу данных.
Я могу сделать что-то вроде этого:
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
Но это явно медленно — зачем вставлять в...
1477 просмотров
schedule
26.09.2022
Передача вывода потока в вещание в Akka Streams Graph
Я пытаюсь написать график Akka Stream. Код, который я написал,
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
(sink1, sink2) =>
import GraphDSL.Implicits._
val bcast =...
340 просмотров
schedule
11.01.2024
Потоковый тест Akka с помощью TestKit и ScaleTest
Я получаю NullPointerException при тестировании своего потокового приложения akka с помощью scalatest и не понимаю, почему ... Я, вероятно, что-то пропустил в Akka Streams, я просто ныряю в это.
Я использую общую структуру кода для масштабирования...
1622 просмотров
schedule
19.05.2024
Akka Distributed Pub/Sub противодавление
Я использую Akka Distributed Pub/Sub и имею одного издателя и подписчика. Мой издатель намного быстрее, чем подписчик. Есть ли способ замедлить работу издателя после определенного момента?
Код издателя:
public class Publisher extends...
771 просмотров
schedule
06.12.2022
Преобразование данных Slick Streaming и отправка фрагментированного ответа с помощью Akka Http
Цель состоит в том, чтобы выполнить потоковую передачу данных из базы данных, выполнить некоторые вычисления для этого фрагмента данных (это вычисление возвращает Future некоторого класса case) и отправить эти данные в виде ответа по фрагментам...
700 просмотров
schedule
11.04.2024