Я пытаюсь передать результат PersistenceQuery с помощью akka-http в качестве SSE, но кажется, что когда http-соединение закрывается от клиента, поток PersistenceQuery по-прежнему периодически попадает в серверную часть события.
// Http part
complete {
source(id)
.map(e => e.event) // other transformations
.map(e => ServerSentEvent(m.toString))
.keepAlive(4 seconds, () => ServerSentEvent.heartbeat)
}
// source
def source(id: UUID)(implicit system: ActorSystem, materializer: ActorMaterializer)= {
import system.dispatcher
val journalQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val futureSrcGraph: RunnableGraph[Future[Source[EventEnvelope, NotUsed]]] =
journalQuery.currentEventsByPersistenceId(id.toString, 0, Long.MaxValue)
.map(_.sequenceNr)
.toMat(Sink.last)(Keep.right)
.mapMaterializedValue(fs => fs.recoverWith {
case _ => Future { 0L } // assume we start at 1
}.map(s => journalQuery.eventsByPersistenceId(id.toString, s + 1, Long.MaxValue)))
Source.fromFutureSource(futureSrcGraph.run())
Таким образом, это в основном работает, единственная проблема заключается в том, что поток никогда не заканчивается, или так кажется. Я пробовал это как с CassandraReadJournal, так и с LevelDb
Пример вывода журнала:
[DEBUG] [06/18/2018 10:52:16.774] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:16.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [15] ms (empty)
[DEBUG] [06/18/2018 10:52:16.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:16.796] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [5] ms (empty)
[DEBUG] [06/18/2018 10:52:19.768] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:19.784] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [15] ms (empty)
[DEBUG] [06/18/2018 10:52:19.784] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:19.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [6] ms (empty)
[DEBUG] [06/18/2018 10:52:22.765] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:22.772] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [6] ms (empty)
[DEBUG] [06/18/2018 10:52:22.772] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:22.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [17] ms (empty)
И так будет вечно.
Я также попытался опустить Source.fromFutureSource и просто запустить journalQuery.eventsByPersistenceId с теми же результатами.
Что я делаю неправильно?