Я хочу переписать этот код Spring AMQP без аннотаций:
@SpringBootApplication
public class So51009346Application {
public static final String QUEUE_PROCESSING_TRANSACTION = "q1";
public static void main(String[] args) {
SpringApplication.run(So51009346Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
System.out.println(reply);
};
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
container.setMessageListener(new MessageListenerAdapter(listener, "process"));
return container;
}
@Bean
public Queue queue() {
return new Queue(QUEUE_PROCESSING_TRANSACTION);
}
@Bean
public TopicExchange te() {
return new TopicExchange("ex");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(te()).with("rk");
}
}
class RequestObject implements Serializable {
private static final long serialVersionUID = 1L;
}
class ReplyObject implements Serializable {
private static final long serialVersionUID = 1L;
}
@Component
class Listener {
public ReplyObject process(RequestObject ro) {
return new ReplyObject();
}
}
Я попробовал этот простой код Java без аннотаций:
Код производителя:
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.plugin.database.bean.TransactionsBean;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@WebListener
public class ContextServer implements ServletContextListener {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
Connection connection = connectionFactory.createConnection();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareExchange(new TopicExchange(EXCHANGE_PROCESSING));
admin.declareQueue(new Queue(QUEUE_PROCESSING_TRANSACTION, false));
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
RabbitTemplate template = new RabbitTemplate(connectionFactory);
TransactionsBean obj = new TransactionsBean();
obj.setMerchant_id(232323);
template.setReplyTimeout(600000);
TransactionsBean reply = (TransactionsBean) template.convertSendAndReceive(EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING_TRANSACTION, obj);
System.out.println("!!!!! Received Transaction_id " + reply.getTransaction_id());
@Override
public final void contextDestroyed(final ServletContextEvent sce) {
}
}
Код потребителя:
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.plugin.database.bean.TransactionsBean;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;
@SpringBootApplication
@WebListener
public class ContextServer implements ServletContextListener {
private static String QUEUE_PROCESSING_TRANSACTION = "processing-process-queue";
private static final String EXCHANGE_PROCESSING = "processing";
private static final String ROUTING_KEY_PROCESSING_TRANSACTION = "processing.trx.process";
@Override
public final void contextInitialized(final ServletContextEvent contextEvent) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareExchange(new TopicExchange(EXCHANGE_PROCESSING));
admin.declareQueue(new Queue(QUEUE_PROCESSING_TRANSACTION, false));
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
.to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
RabbitTemplate template = new RabbitTemplate(connectionFactory);
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
container.setMessageListener(new MessageListenerAdapter(listener, "process"));
container.setMessageConverter(new SerializerMessageConverter());//basic converter for java.io.Serializable POJO
return container;
}
@Override
public final void contextDestroyed(final ServletContextEvent sce) {
}
@Component
class Listener {
public TransactionsBean process(TransactionsBean ro) {
TransactionsBean obj = new TransactionsBean();
obj.setTransaction_id("some_id");
return obj;
}
}
}
Стек ошибок:
13:20:44,797 INFO [org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer] (ServerService Thread Pool -- 79) Container initialized for queues: [amq.rabbitmq.reply-to]
13:20:44,803 INFO [org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer] (ServerService Thread Pool -- 79) SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-oWxWZJEPgZyP-gRxWe-Ifg identity=42eb5972] started
13:20:49,846 ERROR [org.jboss.msc.service.fail] (ServerService Thread Pool -- 79) MSC000001: Failed to start service jboss.undertow.deployment.default-server.default-host./rest_api: org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./rest_api: java.lang.RuntimeException: java.lang.NullPointerException
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService$1.run(UndertowDeploymentService.java:81)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at [email protected]//org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1985)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1487)
at [email protected]//org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1378)
at java.base/java.lang.Thread.run(Thread.java:844)
at [email protected]//org.jboss.threads.JBossThread.run(JBossThread.java:485)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl.deploy(DeploymentManagerImpl.java:251)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService.startContext(UndertowDeploymentService.java:96)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentService$1.run(UndertowDeploymentService.java:78)
... 8 more
Caused by: java.lang.NullPointerException
at deployment.rest_api.war//org.rest.api.context.ContextServer.contextInitialized(ContextServer.java:43)
at io.undertow.servlet//io.undertow.servlet.core.ApplicationListeners.contextInitialized(ApplicationListeners.java:187)
at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:215)
at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl$1.call(DeploymentManagerImpl.java:184)
at io.undertow.servlet//io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:42)
at io.undertow.servlet//io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.security.SecurityContextThreadSetupAction.lambda$create$0(SecurityContextThreadSetupAction.java:105)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
at org.wildfly.extension.undertow//org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1526)
at io.undertow.servlet//io.undertow.servlet.core.DeploymentManagerImpl.deploy(DeploymentManagerImpl.java:249)
... 10 more
13:20:49,885 ERROR [org.jboss.as.controller.management-operation] (DeploymentScanner-threads - 2) WFLYCTL0013: Operation ("deploy") failed - address: ([("deployment" => "rest_api.war")]) - failure description: {"WFLYCTL0080: Failed services" => {"jboss.undertow.deployment.default-server.default-host./rest_api" => "java.lang.RuntimeException: java.lang.NullPointerException
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
Caused by: java.lang.NullPointerException"}}
13:20:49,908 INFO [org.jboss.as.server] (DeploymentScanner-threads - 2) WFLYSRV0010: Deployed "rest_api.war" (runtime-name : "rest_api.war")
Сообщение от Producer успешно отправлено. Я получаю NPE в этой строке в Producer: reply.getTransaction_id()
Как правильно реализовать Consumer без аннотаций с базовым кодом Java?