Служебная шина Azure - Spring Boot отключить автозапуск (com.microsoft.azure: azure-servicebus-spring-boot-starter)

У меня есть следующая реализация для получения сообщения из служебной шины Azure с использованием приложения Spring Boot, однако я хочу иметь возможность управлять ServiceBusConsumer, чтобы автоматически начать прослушивание темы с помощью свойства профиля загрузки Spring.

что-то вроде этого в application.yaml

 servicebus.consumer.enable=false

он должен отключить ServiceBusConsumer от прослушивания темы (-ов), а также я должен иметь возможность запускать ServiceBusConsumer с помощью REST API, например: ./api/servicebus/consumer/start?

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Log4j2
@Component
class ServiceBusConsumer implements Ordered {

    private final ISubscriptionClient iSubscriptionClient;

    ServiceBusConsumer(ISubscriptionClient isc) {
        this.iSubscriptionClient = isc;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void consume() throws Exception {

        this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {

            @Override
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
                return CompletableFuture.completedFuture(null);
            }

            @Override
            public void notifyException(Throwable exception, ExceptionPhase phase) {
                log.error("eeks!", exception);
            }
        });

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

person Mugil Karthikeyan    schedule 12.02.2021    source источник


Ответы (1)


Вы можете создать bean-компонент ServiceBusConsumer условно, добавив аннотацию @ConditionalOnProperty следующим образом, чтобы убедиться, что bean-компонент создается только тогда, когда servicebus.consumer.enabled=true:

@Log4j2
@Component
@ConditionalOnProperty(prefix = "servicebus.consumer", name = "enabled") 
class ServiceBusConsumer implements Ordered {
...
}
person Domenico Sibilio    schedule 12.02.2021
comment
Спасибо, я также смогу запустить ServiceBusConsumer с помощью REST API, например: ./api/servicebus/consumer/start? Можете ли вы рассказать мне, как этого добиться? - person Mugil Karthikeyan; 12.02.2021
comment
Вы можете ввести ISubscriptionClient, чтобы он был доступен контроллеру REST, и зарегистрировать там обработчик сообщений, как только кто-то вызовет REST API. Если вы хотите остановить активный ISubscriptionClient, вы можете проверить методы close / closeAsync ISubscriptionClient. В противном случае вы также можете проверить методы подписки / отмены подписки ServiceBusTopicTemplate или служебную шину Spring Cloud Stream Binder, которая также включает функцию привязки запуска и остановки: org.springframework.cloud.stream.binder.Binding ‹T› start / stop - person Domenico Sibilio; 12.02.2021