У меня есть потребительское приложение kafka, в котором метод слушателя аннотируется аннотацией @Kafkalistener. Я предоставил названия тем и контейнер в качестве входных данных для этой аннотации. Этот метод имеет аргументы типа Сообщение, а также Подтверждение. Этот метод прослушивает любые сообщения kafka и после получения обрабатывает их и сохраняет в БД. После этого я фиксирую смещение вручную, используя метод accept.acknowledge (). Теперь, если перед этой строкой возникнет какое-либо исключение, смещение не будет зафиксировано вручную. Но я видел, что для свойства контейнера ackOnError по умолчанию установлено значение true. Я хочу получить то же сообщение и обрабатывать его до тех пор, пока оно не будет успешным, и не хочу перемещать указатель вперед, что при перемещении может привести к потере сообщений. Для этого я планирую использовать следующий подход.
1) Класс слушателя будет реализовывать интерфейс ConsumerSeekAware. На ConsumerSeekCallback будет ссылаться экземпляр ThreadLocal, и с его помощью будет вызываться метод поиска для того же сообщения, которое не удалось обработать.
2) Установите max-poll-records равным 1.
Пожалуйста, помогите мне решить, правильный ли это подход. Я не использую Spring kafka 2.x, а скорее 1.2.x. Кроме того, в настоящее время я установил для свойства auto.commit.offset значение false, а также для ackMode значение manual.