Не фиксировать смещение при возникновении исключения в методе прослушивателя Kafka

У меня есть потребительское приложение kafka, в котором метод слушателя аннотируется аннотацией @Kafkalistener. Я предоставил названия тем и контейнер в качестве входных данных для этой аннотации. Этот метод имеет аргументы типа Сообщение, а также Подтверждение. Этот метод прослушивает любые сообщения kafka и после получения обрабатывает их и сохраняет в БД. После этого я фиксирую смещение вручную, используя метод accept.acknowledge (). Теперь, если перед этой строкой возникнет какое-либо исключение, смещение не будет зафиксировано вручную. Но я видел, что для свойства контейнера ackOnError по умолчанию установлено значение true. Я хочу получить то же сообщение и обрабатывать его до тех пор, пока оно не будет успешным, и не хочу перемещать указатель вперед, что при перемещении может привести к потере сообщений. Для этого я планирую использовать следующий подход.

1) Класс слушателя будет реализовывать интерфейс ConsumerSeekAware. На ConsumerSeekCallback будет ссылаться экземпляр ThreadLocal, и с его помощью будет вызываться метод поиска для того же сообщения, которое не удалось обработать.

2) Установите max-poll-records равным 1.

Пожалуйста, помогите мне решить, правильный ли это подход. Я не использую Spring kafka 2.x, а скорее 1.2.x. Кроме того, в настоящее время я установил для свойства auto.commit.offset значение false, а также для ackMode значение manual.


person jackbistro    schedule 10.10.2019    source источник


Ответы (1)


1.2.x больше не поддерживается; вам следует как можно скорее выполнить обновление до 1.3.10; он имеет гораздо более простую модель потоковой передачи благодаря KIP-62.

Текущая версия - 2.3.0.

2.0.1 представил SeekToCurrentErrorHandler именно для этой цели.

person Gary Russell    schedule 10.10.2019
comment
Извините, 1.3.10 - это последняя версия 1.3.x. - person Gary Russell; 11.10.2019