Дублирование доставки сообщений Simple-Kafka-consumer

Я пытаюсь реализовать простое приложение Producer -> Kafka -> Consumer на Java. Я могу успешно создавать и потреблять сообщения, но проблема возникает, когда я перезапускаю потребитель, когда некоторые из уже потребленных сообщений снова получают потребитель от Kafka (не все сообщения, а некоторые из последних). потребляемые сообщения).

Я установил autooffset.reset=largest в своем потребителе, а мое свойство autocommit.interval.ms установлено на 1000 миллисекунд.

Является ли эта «повторная доставка некоторых уже использованных сообщений» известной проблемой или есть какие-то другие настройки, которые я здесь упускаю?

По сути, есть ли способ гарантировать, что ни одно из ранее потребляемых сообщений не будет получено/потреблено потребителем?


person Breonna Calderon    schedule 30.04.2013    source источник


Ответы (1)


Kafka использует Zookeeper для хранения смещений потребителей. Поскольку операции Zookeeper довольно медленные, не рекомендуется фиксировать смещение после использования каждого сообщения.

Можно добавить обработчик завершения работы для потребителя, который будет вручную фиксировать смещение темы перед выходом. Однако это не поможет в определенных ситуациях (например, сбой jvm или kill -9). Чтобы защититься от таких ситуаций, я бы посоветовал реализовать пользовательскую логику фиксации, которая будет фиксировать смещение локально после обработки каждого сообщения (файла или локальной базы данных), а также фиксировать смещение в Zookeeper каждые 1000 мс. При запуске потребителя следует запрашивать оба этих местоположения, и в качестве смещения потребления следует использовать не более двух значений.

person Wildfire    schedule 01.05.2013
comment
это может показаться глупым, но если мы скажем, например, реализовать пользовательскую логику фиксации, то можно ли управлять смещением для каждого сообщения. Например, если у меня есть два сообщения со значением метки времени, я бы хотел установить смещение на основе метки времени. Поэтому, если вторая запись имеет более раннюю временную метку, назначенное ей смещение должно быть меньше, чем у другой. Поэтому при потреблении я буду получать сообщения, которые уже отсортированы. - person user2720864; 09.09.2013