Spring для Apache Kafka: как искать конец раздела?

Я использую Spring Boot 2.0.2.RELEASE со Spring для Apache Kafka (эффективный pom показывает 2.1.6.RELEASE версию для spring-kafka).

Я перешел от использования обычного ByteArrayDeserializer к использованию десериализатора Confluent

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

В результате мне не нужно получать байты, а затем десериализовать их в полезную нагрузку и т. Д. Но побочным эффектом этого является некоторые из старых сообщений - я больше не могу читать, потому что их схема немного отличается в объединенном реестре. .

Поэтому, когда я запускаю приложение, я все время получаю это сообщение

    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Итак, я решил, что мне нужно начать слушать с конца темы, я проверил документацию https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek, в котором предлагалось реализовать ConsumerSeekAware и его субинтерфейс ConsumerSeekAware.ConsumerSeekCallback

я изменил класс @service, содержащий метод @KafkaListener, чтобы реализовать интерфейс, упомянутый в документации.

@Service
 public class MyAvroListener implements 
 ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {

и у него есть аннотированный метод @kafkalistener, в котором я пытался seekToEnd раздела

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
    public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
    this.seekCallBack.get().seekToEnd(topic,0);
    try {
        for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {

Я также пробовал искать конкретное смещение (потому что я все время застреваю на сообщении смещения 36833)

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
        public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
        this.seekCallBack.get().seek(topic,0,36900);
        try {
            for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {

Я реализовал методы из вышеуказанных интерфейсов

private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.set(consumerSeekCallback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void seek(String s, int i, long l) {

}

@Override
public void seekToBeginning(String s, int i) {

}

@Override
public void seekToEnd(String topic, int partition) {
    System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}

Когда приложение запускается, метод registerSeekCallBack попадает, но метод seekToEnd или метод поиска не попадают.

и поэтому я продолжаю получать это сообщение

    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я использую отсюда фрагмент Пример реализации шаблона Spring Kafka для поиска смещение, подтверждение

Как упоминалось здесь, Что определяет смещение потребителя Kafka?, я не могу использовать auto.offset.reset, чтобы начать использование с конца темы (если я не использую другой идентификатор consumerGroupId, что в моем случае невозможно). Мне интересно, могу ли я решить эту проблему, используя существующую группу потребителей.


person robin bajaj    schedule 17.08.2018    source источник


Ответы (3)


Вы выполняете поиск слишком поздно - после poll(), выбирающего записи; вам нужно выполнить поиск в

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

позвонив туда consumerSeekCallback.seekToEnd(...). Поиск будет происходить до того, как poll() получит запись (записи).

Вы также можете использовать инструмент командной строки kafka-consumer-groups для установки произвольных смещений для группы / темы / раздела.

Текущая версия загрузки - 2.0.4, kafka 2.1.8.

Кроме того, вы не должны реализовывать обратный вызов, который передается вам.

Документация кажется ясной ...

При использовании группового управления второй метод вызывается при изменении назначений. Вы можете использовать этот метод, например, для установки начальных смещений для разделов, вызвав обратный вызов; вы должны использовать аргумент обратного вызова, а не тот, который передан в registerSeekCallback.

... если нет, что мы должны изменить?

person Gary Russell    schedule 18.08.2018
comment
я предоставил ответ ниже с рабочим кодом для будущих посетителей. - person robin bajaj; 21.08.2018

У вас есть тема, когда вы уже отправили сообщение с другой схемой.

эту проблему можно решить несколькими способами.

### Deletes all schema versions registered under the subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1]

### Deletes version 1 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1
  1

### Deletes the most recently registered schema under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/latest

Вышеупомянутые API в первую очередь предназначены для использования в среде разработки, где обычно проходят итерации перед окончательной доработкой схемы. Хотя его не рекомендуется использовать в производственной среде, существует несколько сценариев, в которых эти API можно использовать в производственной среде, но с особой осторожностью.

  • Новая регистрируемая схема имеет проблемы совместимости с одной из существующих версий схемы.

  • Необходимо повторно зарегистрировать старую версию схемы для того же объекта.

  • Схемы используются только в потоковых системах в реальном времени, и более старые версии больше не требуются.

  • Тема должна быть переработана

Также важно отметить, что любые зарегистрированные параметры совместимости для субъекта также будут удалены при использовании «Удалить тему» ​​или при удалении единственной доступной версии схемы.

Второй подход - начать отправлять сообщения в новую тему. Выполните следующие действия, и все будет в порядке.

  • Обновите Producer для отправки данных в новую тему, которая зарегистрирует обновленную схему в реестре схем.
  • Убедитесь, что Lag равен нулю для всех потребителей этой темы
  • Обновите потребителя, чтобы он потреблял данные из новой темы
  • Удалить старую тему из кафки
person Manjeet Duhan    schedule 18.08.2018

Основываясь на отзывах @Gary Russell, я внес в свой код следующие изменения, чтобы он заработал. Спасибо @Gary и @ Manjeet / @ cricket_007

Итак, я сделал следующее, и это сработало

По сути, никаких изменений в методе, аннотированном как @KafkaListener, но содержащий его класс должен реализовывать эти интерфейсы.

MyKafkaListenerClass implements ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware

а затем в этом классе я реализую методы из этих интерфейсов ...

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.set(consumerSeekCallback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.get().seekToEnd(topic,0);
}

@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void seek(String topic, int partition, long offset) {
    System.out.println("seekToEnd is hit for topic= " + topic + " and partition=" + partition+ " and offset =" + offset);
}

@Override
public void seekToBeginning(String s, int i) {

}

@Override
public void seekToEnd(String topic, int partition) {
    System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
person robin bajaj    schedule 21.08.2018