Невозможно выполнить модульное тестирование аннотированного метода @KafkaListener

Весной я пытаюсь провести модульное тестирование потребительского класса кафки. Я хочу знать, что если сообщение kafka отправлено в его тему, метод прослушивателя был вызван правильно. Мой потребительский класс аннотирован следующим образом:

@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...

Если я @Autowire потребитель, когда я отправляю сообщение kafka, метод прослушивателя вызывается правильно, но я не могу утверждать, что метод был вызван, потому что класс не является макетом.

Если я издеваюсь над потребителем, когда я отправляю сообщение kafka, метод слушателя вообще не вызывается. Я могу вызвать метод напрямую и утверждать, что он сработал, но это не делает то, что я хочу, то есть проверяю, вызывается ли метод, когда я отправляю сообщение kafka в его тему.

На данный момент я прибегнул к размещению счетчика внутри потребителя и увеличивал его каждый раз, когда вызывается метод прослушивателя, а затем проверял, изменилось ли его значение. Создание переменной только для тестирования кажется мне ужасным решением.

Может быть, есть способ заставить издевательского потребителя также получать сообщения kafka? Или какой-то другой способ утверждать, что был вызван метод прослушивателя не издевательского потребителя?


person Jorge Bonafé    schedule 07.05.2018    source источник


Ответы (1)


Похоже, вы запрашиваете что-то похожее на то, что есть в Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html.#test-harness

Итак, если вы не очень хорошо разбираетесь в дополнительной переменной, вы можете позаимствовать ее решение и внедрите свою собственную "обвязку".

Я думаю, что это должно быть хорошим дополнением к Framework, поэтому, пожалуйста, поднимите соответствующую проблему и мы вместе можем сделать такой инструмент доступным для всех.

ОБНОВЛЕНИЕ

Итак, в соответствии с фондом Spring AMQP я сделал это в своей тестовой конфигурации:

public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {

    private final Map<String, Object> listeners = new HashMap<>();

    @Override
    protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        bean = Mockito.spy(bean);

        this.listeners.put(kafkaListener.id(), bean);

        super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
    }

    @SuppressWarnings("unchecked")
    public <T> T getSpy(String id) {
        return (T) this.listeners.get(id);
    }

}

...

@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
    return new KafkaListenerTestHarness();
}

Затем в целевом тестовом примере я использую его следующим образом:

@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");

verify(listener, times(2)).listen1("foo");
person Artem Bilan    schedule 07.05.2018
comment
Есть KafkaListenerAnnotationBeanPostProcessor для обработки всех этих @KafkaListener методов. Вам просто нужно следовать RabbitListenerTestHarness логике. Позвольте мне показать кое-что простое в моем ответе чуть позже! - person Artem Bilan; 07.05.2018
comment
Пожалуйста, найдите ОБНОВЛЕНИЕ в моем ответе. - person Artem Bilan; 07.05.2018
comment
Извините, я немного запутался... Итак, у меня есть это: @Autowired private TestConfig.KafkaListenerTestHarness harness; и это: @Autowired private ReceiveMessageRetrievedEventHandler receiver; Второй - мой потребитель с методом слушателя. Как бы я использовал ваш код здесь? И параметр foo я тоже не понимаю... Спасибо за помощь, извините, у меня с этим проблемы... - person Jorge Bonafé; 07.05.2018
comment
Обратите внимание, как я храню spy - в KafkaListenerTestHarness .listeners Map. И только так вы сможете получить доступ к spy. foo это id() на @KafkaListener. Посмотрите, что я получаю за ключ карты в файле KafkaListenerTestHarness. - person Artem Bilan; 07.05.2018
comment
Кажется, я понимаю... Но теперь я получаю сообщение об ошибке Another endpoint is already registered with id, от которого не могу избавиться... - person Jorge Bonafé; 08.05.2018
comment
Я, я использую этот идентификатор только в одном месте: @KafkaListener( topic ... id = "ReceiveMessageProductRetrievedEventHandlerID") public void productRetrievedEvent(final ProductTransaction message) { И затем в моем тесте: @ContextConfiguration(classes = {TestConfig.class ... ReceiveMessageProductRetrievedEventHandler.class}) @Autowired private TestConfig.KafkaListenerTestHarness harness; Ошибка возникает, если выполняется эта строка в обвязке: super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName); Если я ее прокомментирую, она исчезнет. - person Jorge Bonafé; 09.05.2018
comment
Это не. Wanted but not invoked. здесь: verify(listener, times(1)).productRetrievedEvent(anyObject());. И у меня была точка останова на слушателе, она действительно была вызвана. Я собираюсь прочитать весь этот код еще раз, может я что-то упустил... - person Jorge Bonafé; 09.05.2018
comment
Я понимаю, что нигде не использую твой kafkaListenerAnnotationBeanPostProcessor(), он просто там... Верно? - person Jorge Bonafé; 09.05.2018
comment
Я не понимаю ваш последний вопрос. kafkaListenerAnnotationBeanPostProcessor() — это bean-компонент, который нужно просто объявить там. - person Artem Bilan; 09.05.2018
comment
Если это не работает для вас, то вам следует отказаться от такого решения и пойти другим путем. Кроме того, я уже просил вас поднять соответствующий вопрос GH по этому вопросу, и мы определенно сделаем что-то в Framework для этих случаев использования. - person Artem Bilan; 09.05.2018