У меня есть потребитель spring-kafka, который читает записи и передает их в кеш. Запланированная задача будет периодически очищать записи в кеше. Я хочу обновить COMMIT OFFSET только после того, как пакет был успешно сохранен в базе данных. Я попытался передать объект подтверждения службе кеширования, чтобы вызвать метод подтверждения, как показано ниже.
public class KafkaConsumer {
@KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
cacheService.add( record.value(), acknowledgment );
}
}
public class CacheService {
// concurrency handling has been left out in favor of readability
public void add( String record, Acknowledgment acknowledgment ) {
this.records.add(record);
this.lastAcknowledgment = acknowledgment;
}
public void saveBatch() { //called by scheduled task
if( records.size() == BATCH_SIZE ) {
// perform batch insert into database
this.lastAcknowledgment.acknowledge();
this.records.clear();
}
}
}
AckMode был установлен следующим образом:
factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );
И автоматическая фиксация ложна:
config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
Даже если вызывается метод подтверждения, смещение фиксации не обновляется. Как лучше всего обновить смещение фиксации после сохранения записей?
Использую spring-kafka 2.1.7.RELEASE.
ИЗМЕНИТЬ: после подтверждения @GaryRussell < / a> что подтверждения, сделанные внешними потоками, выполняются потоком-потребителем во время следующего опроса, я перепроверил свой код и обнаружил ошибку в том, как установлен последний объект подтверждения. После исправления этого смещение фиксации ОБНОВЛЯЕТСЯ должным образом. Итак, этот вопрос решен. Однако у меня нет возможности отметить этот вопрос как ответ.
scheduled task
это другой поток? Я считаю, что этотlastAcknowledgment.acknowledge();
должен вызываться в потребительском потоке, и как вы определяете, что смещение не отправляется? - person Deadpool   schedule 27.01.2019