Spring Boot Kafka Listener vs Consumer

Какая разница? Могут ли термины KafkaConsumer и KafkaListener использоваться как взаимозаменяемые?


person George    schedule 03.04.2019    source источник
comment
Они не взаимозаменяемы в коде Java. KafkaConsumer - это класс из клиентской библиотеки Kafka, который предоставляет API для приложений для получения сообщений. KafkaListener - это аннотация, применяемая к методу, поэтому Spring Kafka будет вызывать ее для обработки сообщения.   -  person Chin Huang    schedule 03.04.2019
comment
Так что же делает каждый?   -  person George    schedule 03.04.2019


Ответы (1)


@KafkaListener - это высокоуровневый API для ConcurrentMessageListenerContainer, который порождает несколько внутренних слушателей вокруг KafkaConsumer.

Разница в том, что этот KafkaConsumer API опрашивается по запросу, когда вы вызываете его poll() всякий раз, когда вам нужно. Абстракция слушателя собирается создать бесконечный цикл вокруг этого poll(), и он создает сообщения для записей всякий раз, когда они появляются из poll(). У нас есть исполнитель задач, который выполняет такую ​​логику:

        while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping
            }
            catch (NoOffsetForPartitionException nofpe) {
                this.fatalError = true;
                ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                break;
            }
            catch (Exception e) {
                handleConsumerException(e);
            }
            catch (Error e) { // NOSONAR - rethrown
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }
                this.logger.error("Stopping container due to an Error", e);
                wrapUp();
                throw e;
            }
        }

KafkaConsumer.poll() вызывается в этом pollAndInvoke();.

person Artem Bilan    schedule 03.04.2019
comment
Когда я устанавливаю для свойства spring.kafka.listener.concurrency значение 4, будут ли созданы экземпляры 4 потребителей или это 4 слушателя? - person George; 03.04.2019
comment
Слушатель будет одинаковым для всех потребителей. Он не имеет состояния, поэтому вполне нормально не порождать больше слушателей. Параллелизм полностью зависит от количества разделов, которые вы собираетесь создать из конфигурации ваших тем для слушателя. Итак, если в конечном итоге у вас есть только один раздел, конфигурация параллелизма не имеет значения. - person Artem Bilan; 03.04.2019