Как узнать последнее смещение темы Kafka, чтобы знать, когда мой читатель в курсе темы?

У меня есть сервер, который должен хранить в памяти кеш всех пользователей. Итак, предполагая, что список не будет большим - пара сотен тысяч элементов, я хотел бы использовать тему Kafka с сообщениями с ключами, где ключ - это идентификатор пользователя, чтобы сохранить текущее состояние этого списка, и приложение администратора отправит новый пользовательский объект в ту тему, когда что-то изменилось. Поэтому, когда сервер запускается, ему просто нужно прочитать все из этой темы с самого начала и заполнить его кеш.

Фаза заполнения занимает около 20-30 секунд в зависимости от подключения к Kafka, поэтому серверу не нужно подключаться к сети, пока он не прочитает все из темы, чтобы иметь актуальный кеш (все сообщения в теме на момент запуска считается современным). Но я не понимаю, как определить, прочитал ли я все из потока Kafka, чтобы уведомить другие службы о том, что кеш заполнен, и сервер может запускать серверные запросы. Я читал о высоком водяном знаке, но не вижу его в потребительском API Java.

Итак, как узнать последнее смещение темы Kafka, чтобы знать, когда мой читатель обновлен?


person makados    schedule 18.11.2015    source источник


Ответы (1)


Предполагая, что вы используете потребителя высокого уровня.

Верхний водяной знак недоступен в потребителе высокого уровня.

**As you mentioned: all the messages in the topic at the moment of start is considered up-to-date**

когда ваше приложение запускается, вы можете сделать следующее, используя SimpleConsumer Api:

  1. Найдите количество разделов в теме, отправив запрос TopicMetadataRequest любому брокеру в кластере kafka.

  2. Создайте раздел для карты lastOffset, где ключ — это раздел, а значение — это lastOffset, доступный в этом разделе.

    Map‹Integer,Integer> offsetMap = new HashMap‹>()

  3. Для каждого раздела p в теме:

    A. Найдите лидера раздела p

    B. Отправить OffsetRequest лидеру

    C. Получить lastOffset из OffsetResponse

    D. Добавьте запись в offsetMap, где ключ — это раздел p, а смещение — lastOffset.

  4. Начните читать сообщения от kafka с помощью потребителя высокого уровня:

    A. Для каждого сообщения, которое вы получаете от KafkaStream:

      AA. Get the partition && offset of the message
      BB. if( offsetMap.get(partition)<=offset) stop Reading from this steam
    

Надеюсь это поможет.

person mrnakumar    schedule 18.11.2015
comment
как вы отправляете запрос смещения лидеру? - person sebnukem; 25.02.2016
comment
@sebnukem вот пример на Scala: суть. Преобразуйте его в Java, и он должен работать. - person makados; 25.02.2016