Я пытаюсь создать простую загрузку Kafka Consumer для использования сообщений из темы kafka, однако сообщения не потребляются, поскольку метод KafkaListener не запускается.
Я видел в других ответах, чтобы убедиться, что AUTO_OFFSET_RESET_CONFIG установлен на «самый ранний» и что GROUP_ID_CONFIG является уникальным, что я и сделал, но все же KafkaListenerMethod не запускается. Приложение просто запускается и ничего не делает:
Application Started
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.4.RELEASE)
Это подпроект другого проекта gradle, и build.gradle для этого подпроекта выглядит следующим образом (MAIN_CLASS_PATH правильно указан в коде):
apply plugin: 'application'
mainClassName = <MAIN_CLASS_PATH>
dependencies {
compile "org.springframework.kafka:spring-kafka:${SpringKafkaVersion}"
compile "org.springframework.boot:spring-boot-starter:${SpringBootVersion}"
compile group: 'org.springframework', name: 'spring-tx', version: '5.2.2.RELEASE'
}
Классы Java:
Start.java:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Start {
public static void main(String[] args) {
try {
System.out.println("Application Started");
SpringApplication.run(Start.class, args);
} catch (Exception e) {
e.printStackTrace();
}
}
}
KafkaConsumerConfig.java:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.UUID;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
<KAFKA_SERVER_ADDRESS>);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
@KafkaListener(topics = <TOPIC_NAME>)
public void consume(@Payload ConsumerRecord<String, String> message) {
System.out.println("Consumed message: " + message);
}
}
KAFKA_SERVER_ADDRESS и TOPIC_NAME правильно указаны в моем коде. Также я проверил, что тема уже содержит сообщения.
Любые идеи относительно того, почему это не потребляет никаких сообщений из темы кафки?
KafkaConsumer
в том же пакете (или подпакете), что иStart
? - person Gary Russell   schedule 06.01.2020ktest24: partitions assigned: [ktest24-0, ktest24-9, ...
. При ведении журнала DEBUG вы должны видетьReceived: 0 records
иCommit list: {}
каждые 5 секунд (по умолчанию), если нечего извлекать. Если вы их не видите, что-то застряло, и вам придется сделать дамп потока, чтобы увидеть, что делает потребительский поток. - person Gary Russell   schedule 07.01.2020