У меня есть потребитель Kafka High level.
public class KafkaHighLevelConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public KafkaHighLevelConsumer(int id,
String groupId,
List<String> topics,BlockingQueue<String> storyQueue) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
}finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
Потребитель работает нормально, но мне нужно следить за состоянием потребителя. Почему у нас нет исключения, если IP-адрес или порт сервера неверны?
Если я изменю порт на какой-то неправильный props.put("bootstrap.servers", "localhost:9091");
на props.put("bootstrap.servers", "localhost:100500");
, я все равно не смогу получить никаких исключений.
Я хотел бы знать, успешно ли я подключился к Kafka или нет! Можно ли справиться с таким случаем?
Я использую такие депы Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Спасибо!