Как проверить состояние потребителя Kafka

У меня есть потребитель Kafka High level.

public class KafkaHighLevelConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public KafkaHighLevelConsumer(int id,
                        String groupId,
                        List<String> topics,BlockingQueue<String> storyQueue) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9091");
        props.put("group.id", groupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(props);
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    data.put("value", record.value());
                    System.out.println(this.id + ": " + data);
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        }finally {
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

Потребитель работает нормально, но мне нужно следить за состоянием потребителя. Почему у нас нет исключения, если IP-адрес или порт сервера неверны?

Если я изменю порт на какой-то неправильный props.put("bootstrap.servers", "localhost:9091"); на props.put("bootstrap.servers", "localhost:100500");, я все равно не смогу получить никаких исключений.

Я хотел бы знать, успешно ли я подключился к Kafka или нет! Можно ли справиться с таким случаем?

Я использую такие депы Maven

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>

Спасибо!


person Aleksandr Filichkin    schedule 29.06.2016    source источник
comment
Вы можете использовать kafka-consumer-groups.sh, чтобы увидеть потребителей тем и их состояния.   -  person MGolovanov    schedule 01.07.2016
comment
Исключения нет, потому что клиент предполагает, что брокер может просто не работать. Клиент подключится к брокеру, если он выйдет в сеть.   -  person Matthias J. Sax    schedule 02.07.2016


Ответы (2)


Согласно документации клиента (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html):

Он будет прозрачно обрабатывать сбои серверов в кластере Kafka.

Библиотека не может определить разницу между временным сбоем, постоянным сбоем или неправильной конфигурацией. Таким образом, он считает каждую неудачу «повторной попыткой» и делает именно это, повторяет попытки вечно, никогда не возвращая ошибку.

Это обходной путь для проверки состояния соединения: запросите некоторую информацию, которая гарантированно вернется, подождите некоторое разумное время, и если ничего не вернется, вы знаете, что с соединением возникла проблема:

int CONNECTION_TEST_TIMEOUT_SECONDS = 10; // or whatever is appropriate for your environment

ExecutorService executor = Executors.newSingleThreadExecutor();
Runnable testTask = consumer::listTopics;

Future future = executor.submit(testTask);
try {
    future.get(CONNECTION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    consumer.wakeup();
    throw new IOException("Could not communicate with the server within " + CONNECTION_TEST_TIMEOUT_SECONDS + " seconds");
} catch (InterruptedException e) {
    // Nothing to do. Maybe a warning in the log?
} catch (ExecutionException e) {
    throw new IOException("Exception while running connection test: " + e.getMessage(), e);
}
person gomesrodris    schedule 07.07.2017
comment
Реализация этого работает, но при запуске возникает исключение: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer небезопасен для многопоточного доступа - person Jvalant Dave; 30.08.2019

В сливающемся Kafka OnError есть событие, которое вызывает ошибку всякий раз, когда потребителю не удается подключиться. Вот описание кода

    //
    // Summary:
    //     Raised on critical errors, e.g. connection failures or all brokers down. Note
    //     that the client will try to automatically recover from errors - these errors
    //     should be seen as informational rather than catastrophic
    //
    // Remarks:
    //     Executes on the same thread as every other Consumer event handler (except OnLog
    //     which may be called from an arbitrary thread).
    public event EventHandler<Error> OnError;

Сборка Confluent.Kafka, версия = 0.11.6.0, культура = нейтральная, PublicKeyToken = null

person Muhammad Faizan Khan    schedule 19.02.2019