Я использую Spring Boot 2.0.2.RELEASE
со Spring для Apache Kafka (эффективный pom показывает 2.1.6.RELEASE
версию для spring-kafka).
Я перешел от использования обычного ByteArrayDeserializer
к использованию десериализатора Confluent
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
В результате мне не нужно получать байты, а затем десериализовать их в полезную нагрузку и т. Д. Но побочным эффектом этого является некоторые из старых сообщений - я больше не могу читать, потому что их схема немного отличается в объединенном реестре. .
Поэтому, когда я запускаю приложение, я все время получаю это сообщение
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Итак, я решил, что мне нужно начать слушать с конца темы, я проверил документацию https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek, в котором предлагалось реализовать ConsumerSeekAware и его субинтерфейс ConsumerSeekAware.ConsumerSeekCallback
я изменил класс @service, содержащий метод @KafkaListener, чтобы реализовать интерфейс, упомянутый в документации.
@Service
public class MyAvroListener implements
ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {
и у него есть аннотированный метод @kafkalistener, в котором я пытался seekToEnd раздела
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seekToEnd(topic,0);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
Я также пробовал искать конкретное смещение (потому что я все время застреваю на сообщении смещения 36833)
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seek(topic,0,36900);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
Я реализовал методы из вышеуказанных интерфейсов
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.seekCallBack.set(consumerSeekCallback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void seek(String s, int i, long l) {
}
@Override
public void seekToBeginning(String s, int i) {
}
@Override
public void seekToEnd(String topic, int partition) {
System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
Когда приложение запускается, метод registerSeekCallBack попадает, но метод seekToEnd или метод поиска не попадают.
и поэтому я продолжаю получать это сообщение
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Я использую отсюда фрагмент Пример реализации шаблона Spring Kafka для поиска смещение, подтверждение
Как упоминалось здесь, Что определяет смещение потребителя Kafka?, я не могу использовать auto.offset.reset, чтобы начать использование с конца темы (если я не использую другой идентификатор consumerGroupId, что в моем случае невозможно). Мне интересно, могу ли я решить эту проблему, используя существующую группу потребителей.