Я получаю сообщения в потоке Kafka. Они определяются идентификатором пользователя. При поступлении им присваивается порядковый номер и отметка времени. Срок действия сообщений истекает через 15 минут. Пользователь может запрашивать новые сообщения на основе заданного времени (до 15 минут) или последовательности.
То, что у меня изначально есть, это что-то вроде этого:
` StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
messageSupplier = Stores.persistentKeyValueStore("user.messages");
KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.aggregate(this::createMessageCache,
this::addMessageToMessageCache,
Materialized.as(messageSupplier));
// ---> Some other setup stuff, then start the streams
`
MessageCache
содержит список сообщений (удаляет сообщения с истекшим сроком действия, когда мы добавляем сообщение в кеш). Когда я получаю запрос на сообщения, я просматриваю список и отфильтровываю подходящие сообщения.
Я думал, что мог бы использовать одну из стратегий работы с окнами, но не смог найти пример, который действительно сохранял бы список сообщений.
Это лучший способ сделать это? Или я упустил что-то, что сделало бы это проще/лучше?