Quarkus AMQP отправляет сообщение в очередь после запроса бизнес-логики

Как только я получаю HTTP-сообщение Get / Post, я должен настаивать и возражать, а затем отправлять сообщение в очередь, где другие службы слушают, чтобы начать выполнять другую сложную работу.

Моя текущая проблема заключается в том, что я не могу просто вызвать метод с аннотацией @Outgoing ("канал"), я попробовал это и просто продолжаю выполнять метод без вызова

Есть ли способ вызвать метод для отправки полезной нагрузки JSON в очередь с помощью платформы Quarkus?

PS: Я также пытаюсь использовать rabbitMQ и снова переключился на ActiveMQ

Я следовал инструкциям Quarkus по реактивному обмену сообщениями и пытался зарегистрировать что-то в реализованном ресурсе, но безуспешно.

@Path("/part")
class PartService : PanacheRepository<PartDao>, Logging {

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Transactional
    fun fetchParts(): List<PartDao> {

        val partDao = PartDao(label = "Test", status = PartStatus.INBANK, creatorId = "ghost-007")

        partDao.persist()

        if (partDao.isPersistent) {
            // Send a message to a queue -> PoC
            send(partDao)
        }

        return findAll().list()
    }

    @Outgoing("part-persisted")
    @Transactional
    fun send(partDao: PartDao): CompletionStage<AmqpMessage<*>> {
        val future = CompletableFuture<AmqpMessage<*>>()
        val message = "hello from sender"

        // Debug proposes
        println("Sending (data): $message")
        logger.debug(partDao.toString())

        future.complete(AmqpMessage(message))
        return future
    }

}

Ожидал:

Зарегистрируйте сообщение "привет от отправителя" в очереди после выполнения:

curl http://localhost/part

Фактические результаты:

метод send просто продолжает выполняться


person MoLt1eS    schedule 23.07.2019    source источник
comment
Привет, можно ли разделить различные проблемы на разные классы? Сложно понять, что происходит сейчас   -  person geoand    schedule 24.07.2019
comment
Здравствуйте, лучше объяснить, что происходит: 1. Метод fetchParts выполняется, когда выполняется HTTP-запрос (например: curl localhost / parts) 2. Когда выполняется метод fetchParts (), мне нужно отправить сообщение в очередь, поэтому он вызывает метод send () 3. Метод send () возвращает результат в очередь сообщений, настроенную в аннотации @Outgoing. Проблема в том, что метод send () выполняется без остановок, а не при выполнении fetchParts   -  person MoLt1eS    schedule 24.07.2019
comment
Меня беспокоит, что объединение всех проблем в один и тот же класс может вызвать странное поведение.   -  person geoand    schedule 25.07.2019
comment
Это просто для тестирования порпусов   -  person MoLt1eS    schedule 25.07.2019
comment
Конечно, я понимаю, но меня беспокоит, что этот простой подход, предназначенный только для тестирования, сам по себе вызывает проблемы.   -  person geoand    schedule 25.07.2019


Ответы (1)


Если я правильно понимаю, вы хотите вызвать метод, который что-то помещал бы в поток.

Насколько мне известно, для этого нужно использовать Emitter, см., Например, https://github.com/michalszynkiewicz/devoxxpl-demo/blob/master/search/src/main/java/com/example/search/SearchEndpoint.java#L23

См. https://smallrye.io/smallrye-reactive-messaging/#_stream. документация.

person Michał Szynkiewicz    schedule 25.07.2019
comment
Большое спасибо! Это именно то, что мне нужно - person MoLt1eS; 28.07.2019