Kafka Streams - сохранять сообщения по метке времени/последовательности?

Я получаю сообщения в потоке Kafka. Они определяются идентификатором пользователя. При поступлении им присваивается порядковый номер и отметка времени. Срок действия сообщений истекает через 15 минут. Пользователь может запрашивать новые сообщения на основе заданного времени (до 15 минут) или последовательности.

То, что у меня изначально есть, это что-то вроде этого:

` StreamsBuilder streamsBuilder = new StreamsBuilder();

  KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
  messageSupplier = Stores.persistentKeyValueStore("user.messages");

  KTable<String, MessageCache> messageTable = inboundStream
      .filter(this::userExists)
      .peek(this::recordInboundMessage)
      .map(this::markMessage)       // add sequence/timestamp
      .groupByKey()
      .aggregate(this::createMessageCache,
              this::addMessageToMessageCache,
              Materialized.as(messageSupplier));

  // ---> Some other setup stuff, then start the streams

`

MessageCache содержит список сообщений (удаляет сообщения с истекшим сроком действия, когда мы добавляем сообщение в кеш). Когда я получаю запрос на сообщения, я просматриваю список и отфильтровываю подходящие сообщения.

Я думал, что мог бы использовать одну из стратегий работы с окнами, но не смог найти пример, который действительно сохранял бы список сообщений.

Это лучший способ сделать это? Или я упустил что-то, что сделало бы это проще/лучше?


person Daniel Israel    schedule 18.06.2018    source источник


Ответы (1)


Это лучший способ сделать это? Или я упустил что-то, что сделало бы это проще/лучше?

Я думаю, у вас есть простое решение, использующее нативные классы Java, которые эффективно связывают приложение потоков с вашим кодом... Многое можно сказать о простоте! Единственный недостаток, который я вижу, заключается в том, что если ваши пользовательские кеши могут превысить размер вашей памяти, если ваша частота событий слишком высока. Кроме того, если вам нужна отказоустойчивость, потоковое приложение восстановит содержимое хранилища состояний в другом экземпляре приложения в случае сбоя. Но если это не проблема, то дерзайте!

Но с точки зрения того, как это сделать в контексте потокового приложения, есть несколько настроек, которые вы можете сделать для этого:

  1. Определите степень детализации пользовательских запросов, которые вы хотите поддерживать. Минуты? Секунды? Скажем минут ради спора. Окно вашего потока в соответствии с этой степенью детализации.

  2. Определите аккумулятор, подобный тому, что у вас есть, который будет принимать запись пользователя и добавлять ее в список. Что-то вроде UserRecordGroup, у которого есть List из UserRecord, и метод add(UserEvent evt), который добавит UserRecord к List.

Затем вы можете создать свое потоковое приложение, например:

KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
 Materialized<String, UserRecordGroup, WindowStore<Bytes, byte[]>> userStore =
 Materialized.<String, UserRecordGroup, WindowStore<Bytes,byte[]>>as("user.messages")
  .withValueSerde(/* your serializers here */);


KTable<String, MessageCache> messageTable = inboundStream
  .filter(this::userExists)
  .peek(this::recordInboundMessage)
  .map(this::markMessage)       // add sequence/timestamp
  .groupByKey()
  .windowedBy(TimeWindows.of(ONE_MINUTE_IN_MS))
  .aggregate(UserRecordGroup::new,
            (key, value, agg) -> { agg.add(value); },
             userStore);

Наконец, если вы хотите обслуживать запросы из хранилища, вы можете

ReadOnlyWindowStore<Integer, UserRecordGroup> store =
   streams.store("user.messages", QueryableStoreTypes.windowStore());
WindowStoreIterator<UserRecordGroup> windowIterator = 
     store.fetch(pathHash, startTimestamp, endTimeStamp);

Ваш итератор будет содержать списки всех записей для разных окон; объедините эти списки вместе, и вы получите описание активности пользователя между startTimestamp и endTimestamp.

person Kyle Fransham    schedule 19.06.2018
comment
Это интересно. Спасибо за отзыв! В этом случае старые окна просто очистятся или они все еще будут там? (Мне нужно что-то делать?) Или это связано с конфигурацией сохранения темы? Некоторые пользователи будут получать сообщения очень часто, некоторые не так часто. Вы сделали полезный вывод, что может быть несколько экземпляров приложения. Вызов сообщений является длительным, поэтому, если сообщений не существует, он ожидает сообщения. Я думал об уведомлении потока, когда пришло сообщение, но если вызов находится в другом экземпляре, есть ли способ сделать это? - person Daniel Israel; 21.06.2018