Реализовать код Spring без аннотаций

Я хочу переписать этот код Spring AMQP без аннотаций:

@SpringBootApplication
public class So51009346Application {

    public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

    public static void main(String[] args) {
        SpringApplication.run(So51009346Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
            System.out.println(reply);
        };
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
        container.setMessageListener(new MessageListenerAdapter(listener, "process"));
        return container;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_PROCESSING_TRANSACTION);
    }

    @Bean
    public TopicExchange te() {
        return new TopicExchange("ex");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(te()).with("rk");
    }

}

class RequestObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

    private static final long serialVersionUID = 1L;

}

@Component
class Listener {

    public ReplyObject process(RequestObject ro) {
        return new ReplyObject();
    }

}

Я попробовал этот простой код Java без аннотаций:

Код производителя:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

import org.plugin.database.bean.TransactionsBean;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@WebListener
public class ContextServer implements ServletContextListener {

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    Connection connection = connectionFactory.createConnection();
    AmqpAdmin admin = new RabbitAdmin(connectionFactory);

    admin.declareExchange(new TopicExchange(EXCHANGE_PROCESSING));
    admin.declareQueue(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

    admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));              
    RabbitTemplate template = new RabbitTemplate(connectionFactory);

    TransactionsBean obj = new TransactionsBean();      
    obj.setMerchant_id(232323);

    template.setReplyTimeout(600000);
    TransactionsBean reply = (TransactionsBean) template.convertSendAndReceive(EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING_TRANSACTION, obj);
    System.out.println("!!!!! Received Transaction_id " + reply.getTransaction_id());   

    @Override
    public final void contextDestroyed(final ServletContextEvent sce) {     
    }
}

Код потребителя:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

import org.plugin.database.bean.TransactionsBean;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;

    @SpringBootApplication
@WebListener
public class ContextServer implements ServletContextListener {

    private static String QUEUE_PROCESSING_TRANSACTION = "processing-process-queue";
    private static final String EXCHANGE_PROCESSING = "processing";
    private static final String ROUTING_KEY_PROCESSING_TRANSACTION = "processing.trx.process";

    @Override
    public final void contextInitialized(final ServletContextEvent contextEvent) {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    AmqpAdmin admin = new RabbitAdmin(connectionFactory);
    admin.declareExchange(new TopicExchange(EXCHANGE_PROCESSING));
    admin.declareQueue(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

    admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
            .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
    container.setMessageListener(new MessageListenerAdapter(listener, "process"));
    container.setMessageConverter(new SerializerMessageConverter());//basic converter for java.io.Serializable POJO
    return container;
    }

    @Override
    public final void contextDestroyed(final ServletContextEvent sce) {
    }

    @Component
    class Listener {

        public TransactionsBean process(TransactionsBean ro) {
            TransactionsBean obj = new TransactionsBean();
            obj.setTransaction_id("some_id");
            return obj;
        }
    }
}

Стек ошибок:

   13:20:44,797 INFO  [org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer] (ServerService Thread Pool -- 79) Container initialized for queues: [amq.rabbitmq.reply-to]
13:20:44,803 INFO  [org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer] (ServerService Thread Pool -- 79) SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-oWxWZJEPgZyP-gRxWe-Ifg identity=42eb5972] started
13:20:49,846 ERROR [org.jboss.msc.service.fail] (ServerService Thread Pool -- 79) MSC000001: Failed to start service jboss.undertow.deployment.default-server.default-host./rest_api: org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./rest_api: java.lang.RuntimeException: java.lang.NullPointerException
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService$1.run(UndertowDeploymentService.java:81)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at [email protected]//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at [email protected]//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1985)
    at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1487)
    at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1378)
    at java.base/java.lang.Thread.run(Thread.java:844)
    at [email protected]//org.jboss.threads.JBossThread.run(JBossThread.java:485)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
    at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl.deploy(DeploymentManagerImpl.java:251)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService.startContext(UndertowDeploymentService.java:96)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService$1.run(UndertowDeploymentService.java:78)
    ... 8 more
Caused by: java.lang.NullPointerException
    at deployment.rest_api.war//org.rest.api.context.ContextServer.contextInitialized(ContextServer.java:43)
    at io.undertow.servlet//io.undertow.servlet.core.ApplicationListeners.contextInitialized(ApplicationListeners.java:187)
    at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:215)
    at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:184)
    at io.undertow.servlet//io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:42)
    at io.undertow.servlet//io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.security.SecurityContextThreadSetupAction.lambda$create$0(SecurityContextThreadSetupAction.java:105)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
    at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
    at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl.deploy(DeploymentManagerImpl.java:249)
    ... 10 more

13:20:49,885 ERROR [org.jboss.as.controller.management-operation] (DeploymentScanner-threads - 2) WFLYCTL0013: Operation ("deploy") failed - address: ([("deployment" => "rest_api.war")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.undertow.deployment.default-server.default-host./rest_api" => "java.lang.RuntimeException: java.lang.NullPointerException
    Caused by: java.lang.RuntimeException: java.lang.NullPointerException
    Caused by: java.lang.NullPointerException"}}
13:20:49,908 INFO  [org.jboss.as.server] (DeploymentScanner-threads - 2) WFLYSRV0010: Deployed "rest_api.war" (runtime-name : "rest_api.war")

Сообщение от Producer успешно отправлено. Я получаю NPE в этой строке в Producer: reply.getTransaction_id() Как правильно реализовать Consumer без аннотаций с базовым кодом Java?


person Peter Penzov    schedule 24.06.2018    source источник


Ответы (2)


Как я объяснил в мой ответ на ваш предыдущий вопрос, если вы не знакомы со Spring, лучший способ начать работу — использовать Spring Boot. Перейдите на http://start.spring.io и создайте новый проект, выбрав RabbitMQ в качестве зависимости. Это именно то, что я сделал для примера кода.

Если вы настаиваете на развертывании собственного кода и не используете Spring для управления зависимостями и жизненными циклами этих компонентов; вам нужно сделать эту работу самостоятельно - позвонить afterPropertiesSet(), start() и т. д.

я получаю NPE

Это совершенно недостаточная информация. Вам нужно показать полную трассировку стека и весь код, включая файл TransactionsBean.

Поверьте мне; «без аннотаций» означает, что вам придется писать намного больше кода самостоятельно.

ИЗМЕНИТЬ

System.out.println(" !!!!!!!! Transaction_id " + receivedobj.getTransaction_id());

Конечно, вы получите NPE — на самом деле вы не проверяете, что возвращаемый объект не является нулевым; шаблон имеет тайм-аут ответа по умолчанию, равный 5 секундам, после чего возвращается null.

Контейнер не будет ничего делать, пока он не будет start()ed; опять таки; пожалуйста, научитесь использовать Spring для управления жизненным циклом этих объектов.

person Gary Russell    schedule 25.06.2018
comment
Я добавил все, что я пробовал до сих пор. Но мне нужна помощь, чтобы заставить код работать. Я определенно начну изучать Spring, но сейчас, чтобы заставить этот код проверки концепции работать как можно быстрее, мне нужна ваша помощь. - person Peter Penzov; 25.06.2018
comment
Я попытался применить изменения, но я получаю NPE. Можете ли вы посоветовать, как решить проблему? - person Peter Penzov; 26.06.2018
comment
Вы получаете NPE, потому что не проверяете нулевой результат. Мне кажется, что вы отправляете сообщение до того, как Spring инициализирует контекст приложения. Согласно contextInitialized javadocs: >Receives notification that the web application initialization process is starting. Подождите, пока приложение не заработает, прежде чем использовать RabbitMQ. - person Gary Russell; 26.06.2018
comment
есть ли обходной путь? Например, вызвать Spring сразу после contextInitialized? - person Peter Penzov; 26.06.2018
comment
Вам нужно подождать до тех пор, пока позже - например, как в ApplicationRunner в моем примере - эти bean-компоненты запускаются после того, как Spring завершил инициализацию. - person Gary Russell; 26.06.2018

Здесь есть несколько проблем:

  1. Как упоминалось выше @Gary Russell, сначала нет времени ожидания соединения.

  2. Основная проблема получения null заключается в том, что convertSendAndReceive работает с POJO, который должен быть сериализуемым, а сериализатор должен быть описан на стороне потребителя:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
    container.setMessageListener(new MessageListenerAdapter(listener, "process"));
    container.setMessageConverter(new SerializerMessageConverter());//basic converter for java.io.Serializable POJO
    return container;
}
person dkashirin    schedule 26.06.2018
comment
Большое спасибо за ответ, но после того, как я применил изменения, я получаю NPE. - person Peter Penzov; 26.06.2018