Spring Boot Kafka Consumer не использует, Kafka Listener не запускается

Я пытаюсь создать простую загрузку Kafka Consumer для использования сообщений из темы kafka, однако сообщения не потребляются, поскольку метод KafkaListener не запускается.

Я видел в других ответах, чтобы убедиться, что AUTO_OFFSET_RESET_CONFIG установлен на «самый ранний» и что GROUP_ID_CONFIG является уникальным, что я и сделал, но все же KafkaListenerMethod не запускается. Приложение просто запускается и ничего не делает:

Application Started

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.4.RELEASE)

Это подпроект другого проекта gradle, и build.gradle для этого подпроекта выглядит следующим образом (MAIN_CLASS_PATH правильно указан в коде):

apply plugin: 'application'

mainClassName = <MAIN_CLASS_PATH>

dependencies {
    compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
    compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
    compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}

Классы Java:

Start.java:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Start {
    public static void main(String[] args) {
        try {
            System.out.println("Application Started");
            SpringApplication.run(Start.class, args);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

KafkaConsumerConfig.java:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.UUID;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                <KAFKA_SERVER_ADDRESS>);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                UUID.randomUUID().toString());
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = <TOPIC_NAME>)
    public void consume(@Payload ConsumerRecord<String, String> message) {
        System.out.println("Consumed message: " + message);
    }
}

KAFKA_SERVER_ADDRESS и TOPIC_NAME правильно указаны в моем коде. Также я проверил, что тема уже содержит сообщения.

Любые идеи относительно того, почему это не потребляет никаких сообщений из темы кафки?


person Amish Bedi    schedule 06.01.2020    source источник
comment
Это выглядит нормально, хотя вам действительно не нужно определять фабрику потребителей или контейнеров - загрузка автоматически настроит их из соответствующего приложения (или yaml). Помогает ли включение ведения журнала DEBUG?   -  person Gary Russell    schedule 06.01.2020
comment
Привет, Гэри, согласно вашему предложению я добавил ведение журнала уровня DEBUG.   -  person Amish Bedi    schedule 06.01.2020
comment
Это помогло? Находится ли класс KafkaConsumer в том же пакете (или подпакете), что и Start?   -  person Gary Russell    schedule 06.01.2020
comment
Не помещайте подобные вещи в комментарии; он плохо отображается; вместо этого отредактируйте вопрос, отметив, что вы это сделали (мы не получаем уведомления об изменениях). Вы должны увидеть журнал INFO с чем-то вроде ktest24: partitions assigned: [ktest24-0, ktest24-9, .... При ведении журнала DEBUG вы должны видеть Received: 0 records и Commit list: {} каждые 5 секунд (по умолчанию), если нечего извлекать. Если вы их не видите, что-то застряло, и вам придется сделать дамп потока, чтобы увидеть, что делает потребительский поток.   -  person Gary Russell    schedule 07.01.2020


Ответы (2)


Проблема заключалась в подключении к кафке. Машине, потребляющей данные с сервера kafka, не было разрешено использовать данные с сервера kafka. Внесение в белый список потребляющей машины в узлах сервера kafka решило проблему.

Если URL-адрес сервера kafka может быть разрешен, но не может быть подключен из-за недостаточных разрешений, потребитель kafka не регистрирует это. Это не относится к spring kafka, но даже обычный потребитель клиентов kafka не регистрировал это.

person Amish Bedi    schedule 07.01.2020

Spring boot и spring kafka имеют автоконфигурацию. Для простого потребителя удалите класс конфигурации и добавьте эти свойства в свой application.yml (или .properties):

 spring.application.name: your-app-name
 spring.kafka.consumer.group-id: your-group-id
 spring.kafka.bootstrap-servers: server1:port1,server2:port2,server3:port3
person Javier Gonzalez Benito    schedule 06.01.2020