Обновить смещение фиксации Kafka после успешной пакетной вставки

У меня есть потребитель spring-kafka, который читает записи и передает их в кеш. Запланированная задача будет периодически очищать записи в кеше. Я хочу обновить COMMIT OFFSET только после того, как пакет был успешно сохранен в базе данных. Я попытался передать объект подтверждения службе кеширования, чтобы вызвать метод подтверждения, как показано ниже.

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

AckMode был установлен следующим образом:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

И автоматическая фиксация ложна:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

Даже если вызывается метод подтверждения, смещение фиксации не обновляется. Как лучше всего обновить смещение фиксации после сохранения записей?

Использую spring-kafka 2.1.7.RELEASE.


ИЗМЕНИТЬ: после подтверждения @GaryRussell < / a> что подтверждения, сделанные внешними потоками, выполняются потоком-потребителем во время следующего опроса, я перепроверил свой код и обнаружил ошибку в том, как установлен последний объект подтверждения. После исправления этого смещение фиксации ОБНОВЛЯЕТСЯ должным образом. Итак, этот вопрос решен. Однако у меня нет возможности отметить этот вопрос как ответ.


person agentwarn    schedule 26.01.2019    source источник
comment
scheduled task это другой поток? Я считаю, что этот lastAcknowledgment.acknowledge(); должен вызываться в потребительском потоке, и как вы определяете, что смещение не отправляется?   -  person Deadpool    schedule 27.01.2019
comment
а какая у вас версия spring-kafka?   -  person Deadpool    schedule 27.01.2019
comment
Да, запланированная задача - это другой поток, потому что я не хочу блокировать потребление сообщений, пока они сохраняются. Используя инструмент командной строки kafka-consumer-groups.sh, я вижу, что текущее смещение не обновляется.   -  person agentwarn    schedule 27.01.2019
comment
Своя пружина-кафка 2.1.7.RELEASE   -  person agentwarn    schedule 27.01.2019
comment
вам нужно удерживать потребительский поток   -  person Deadpool    schedule 27.01.2019


Ответы (1)


Вот проблема, потребительский поток отвечает за фиксацию смещения. Во время опроса потребительский поток отправит предыдущее смещение пакета.

Поскольку в вашем случае AUTO_COMMIT ложно, а lastAcknowledgment.acknowledge() не подтверждает, что смещение не отправлено.

Только один способ сделать это. Как только вы получите записи опроса, сделайте Schedule задачу как Async и удерживайте поток потребителя и отправьте смещение после завершения задачи Async. Проверьте этот ответ для справки answer

Примечание. Если вы удерживаете потребительский поток более 5 минут, перебалансировка будет выполняться здесь < / а>

Новый потребитель Java теперь поддерживает биение пульса из фонового потока. Имеется новая конфигурация max.poll.interval.ms, которая контролирует максимальное время между вызовами опроса, прежде чем потребитель заблаговременно покинет группу (5 минут по умолчанию). Значение конфигурации request.timeout.ms всегда должно быть больше max.poll.interval.ms, потому что это максимальное время, в течение которого запрос JoinGroup может заблокировать на сервере, пока потребитель выполняет ребалансировку, поэтому мы изменили его значение по умолчанию на чуть более 5 минут. Наконец, значение по умолчанию session.timeout.ms было уменьшено до 10 секунд, а значение по умолчанию max.poll.records было изменено на 500.

Специальное примечание из spring kafka> 2.1.5

Подтверждения, сделанные во внешних потоках, будут выполнены потоком-потребителем непосредственно перед следующим опросом. Спасибо @Gary Russell за эту информацию.

person Deadpool    schedule 27.01.2019
comment
но здесь я считаю, что Ack не произойдет до следующего poll, как только потребительский поток освободится, он будет опрашивать следующее смещение, верно? @GaryRussell, а это коммит от 2.1.5? - person Deadpool; 27.01.2019
comment
Это правильно; Время, когда фактически происходит фиксация асинхронного смещения, зависит от состояния потока-потребителя при отправке подтверждения. Если он застрял в poll() (и больше нет доступных записей), фиксация произойдет, когда опрос завершится (время ожидания истекло) и непосредственно перед следующим опросом. Если опрос вернет еще несколько записей, фиксация не произойдет до тех пор, пока все эти записи были отправлены слушателю. processCommits() вызывается непосредственно перед poll() и фиксирует любые ожидающие смещения (все смещения, принятые из асинхронных потоков, добавляются в ожидающую очередь). Да, это было добавлено в 2.1.5. - person Gary Russell; 27.01.2019
comment
У нас нет выбора - Consumer не является потокобезопасным. Чтобы слушатель имел полный контроль, подтверждение должно выполняться в потоке-потребителе (а затем с помощью MANUAL_IMMEDIATE, чтобы фиксация произошла немедленно). - person Gary Russell; 27.01.2019
comment
@GaryRussell - Большое спасибо за подтверждение того, что подтверждение, сделанное внешними потоками, по-прежнему выполняется потоком-потребителем. После этого я проверил свой код и обнаружил ошибку в том, как устанавливается последнее подтверждение. После исправления этого смещение фиксации обновляется, как вы упомянули. - person agentwarn; 27.01.2019