Как вручную получать сообщения от rabbitmq с помощью интеграции spring

[EDIT] Загрузка полных конфигураций:

rabbit.xml, который удаляется из очереди Rabbit

<rabbit:connection-factory id="amqpConnectionFactoryInbound" 
host="${rabbit.host}" port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" channel-
cache-size="5"
connection-factory="rabbitConnectionFactoryInbound"/>

<beans:bean id="rabbitConnectionFactoryInbound" 
class="com.rabbitmq.client.ConnectionFactory">
<beans:property name="requestedHeartbeat" 
value="60" />
</beans:bean>


<!-- Inbound Adapter to AMQP RabbitMq and write to file -->
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel"
concurrent-consumers="8" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="25" />

<header-enricher input-channel="rabbitInboundMessageChannel" output-
channel="rabbitOutboundboundMessageChannel">
<int:header name="Operation" value="${operation.rabbit}" />
<int:header name="GUID" expression="#{ 
'T(java.util.UUID).randomUUID().toString()' }" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" />
</header-enricher>

<int:channel id="rabbitOutboundboundMessageChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>

<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS" 
pool-size="10-30"
queue-capacity="25" />
</beans:beans>

Затем сообщение отправляется на канал маршрутизатора: router.xml

<int:header-enricher input-channel="rabbitOutboundboundMessageChannel" 
output-channel="routerChannel">
<int:header name="Operation" value="${operation.router}" 
overwrite="true" />
<int:header name="file_name" expression="headers['GUID'] + '.xml'" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }"
overwrite="true" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>

<int:recipient-list-router id="rabbitMsgrouter" input-
channel="routerChannel">
<int:recipient channel="fileBackupChannel" selector-expression="new 
String(payload).length()>0" />
<int:recipient channel="transformerChannel" />
</int:recipient-list-router>

<int:channel id="transformerChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="fileBackupChannel"/>
<int:channel id="loggerChannel"/>
</beans>

Сообщение теперь отправляется в файл persister.xml и transform.xml. Ниже приведен файл persister.xml, и я хочу подтвердить его успешность. Существуют и другие нижестоящие процессы после transform.xml

<int:header-enricher input-channel="fileBackupChannel" output-
channel="fileSaveChannel">
<int:header name="Operation" value="${operation.filePersister}" 
overwrite="true" />
<int:header name="replyChannel" value="nullChannel" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/>

<int:service-activator input-channel="rabbitAckChannel" output-
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" />

<bean id="ackRabbit" 
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/>

<int:channel id="rabbitAckChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="loggerChannel"/>
<int:channel id="fileSaveChannel"/>
</beans>

У меня возникли проблемы с ручным получением полезной нагрузки от rabbitmq.

Это мой рабочий процесс:

<сильный>1. Получить сообщение от кролика с помощью адаптера входящего канала:

<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel"
concurrent-consumers="${rabbit.concurrentConsumers}" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="${rabbit.prefetchCount}" />

<сильный>2. Сохранять сообщение на диск с помощью исходящего шлюза:

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="loggerChannel" />

<сильный>3. подтвердите от кролика, когда персистент (шаг 2) завершится успешно.

для шага (3) я написал следующий код:

public class RabbitAcknowledgement {
public void handleRabbitAcks(Message<?> message) throws IOException {
com.rabbitmq.client.Channel channel = (Channel) 
message.getHeaders().get("amqp_channel");
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
channel.basicAck(deliveryTag, false);
}

который я звоню с весны через:

<int:service-activator input-
channel="rabbitOutboundboundMessageChannel" output-
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" />

Это не работает, и полезные данные кролика в моей очереди не подтверждены.

Мои вопросы:

  1. Нужна ли мне РУЧНАЯ проверка в этом сценарии?
  2. Что я делаю не так?

person Praveen Kumar    schedule 21.06.2017    source источник
comment
Какие симптомы доказывают, что это не работает, пожалуйста?   -  person Artem Bilan    schedule 21.06.2017
comment
Полезные нагрузки остаются на кролике и не потребляются потребителями.   -  person Praveen Kumar    schedule 21.06.2017
comment
Кроме того, в моей текущей настройке я считаю, что подтверждаю 1 сообщение на поток. Могу ли я как-то увеличить скорость (т.е. признать в пакетах?), Не критично для функции, но мне было интересно, могу ли я как-то сделать систему более эффективной   -  person Praveen Kumar    schedule 21.06.2017
comment
Как правило, вам не нужны ручные подтверждения; с AUTO контейнер подтвердит сообщение, если персистентность будет успешной, и подтвердит его, если возникнет исключение. Необычно использовать ручные подтверждения для такого простого сценария, как этот. Тем не менее, это должно работать. Вы запускаете ack в потоке контейнера? Или вы передаете другому потоку, используя очередь или канал исполнителя? Можете ли вы отредактировать вопрос, чтобы показать полную конфигурацию (включая каналы). Также включите ведение журнала DEBUG, чтобы узнать, предоставляет ли он дополнительную информацию.   -  person Gary Russell    schedule 21.06.2017
comment
Спасибо Гэри. Как будет работать AUTO, если в том же потоке будут выполняться другие нижестоящие процессы? Например. после настойчивости я хочу преобразовать полезную нагрузку и записать в solr. в настоящее время ACK происходит только тогда, когда и преобразование, и solr преуспевают, вызывая большую нагрузку на сервер кролика. Все эти процессы имеют свои конфиги (rabbit.xml, persist.xml, transform.xml, solr.xml и т.д.).   -  person Praveen Kumar    schedule 21.06.2017
comment
@GaryRussell Я отредактировал вопрос с полной конфигурацией.   -  person Praveen Kumar    schedule 21.06.2017
comment
Я не вижу никаких правок; да, в таком случае акк произойдет, когда все будет сделано; вы бы использовали ручные подтверждения, если хотите подтвердить перед solr. Однако, поскольку ваш handleRabbitAcks возвращает void; Я не вижу, как можно выполнять какую-либо дополнительную работу после того, как она вызвана.   -  person Gary Russell    schedule 21.06.2017
comment
@GaryRussell Прошу прощения. теперь вы должны увидеть изменения.   -  person Praveen Kumar    schedule 21.06.2017
comment
есть маршрутизатор, который отправляет сообщение на персистент и трансформер. persister является концом этого потока, но преобразователь продолжает другие операции и solr. Я хотел подтвердить после сохранения и продолжить трансформацию с исходной полезной нагрузкой. Следовательно, handleRabbitAcks недействителен. Надеюсь, мое объяснение имеет смысл. Я загрузил 3 конфига в редактирование.   -  person Praveen Kumar    schedule 21.06.2017
comment
@GaryRussell Ручное подтверждение кажется медленнее, чем автоматическое подтверждение с той же нагрузкой. Есть ли здесь класс и метод для ручного открытия горлышка бутылки? Любые советы по улучшению скорости подтверждения будут очень полезны. Использование потребителя составляет всего 2% с этим изменением   -  person Praveen Kumar    schedule 23.06.2017
comment
Я не могу представить, почему для вашего варианта использования будет большая разница. Что вы имеете в виду под медленнее? Я не вижу, чтобы слушатель Vs. контейнер, вызывающий basicAck(), не имел бы никакого значения. Возможно, вам нужно увеличить prefetchCount, но, опять же, не должно иметь значения, кто делает acks. Если вы используете режим AUTO ack и увеличиваете txSize, контейнер будет отправлять подтверждение каждые txSize сообщения (но увеличится вероятность повторной доставки).   -  person Gary Russell    schedule 23.06.2017
comment
@GaryRussell Я провел тест производительности для более старой настройки (подтверждение после записи в solr с использованием AUTO ack) и более новой настройки (используйте MANUAL ack, а затем запишите в solr). Скорость доставки или подтверждения намного ниже для РУЧНЫХ подтверждений по сравнению с АВТО (разница в 2 раза)   -  person Praveen Kumar    schedule 23.06.2017
comment
Что-то еще должно было измениться; Я не могу представить, что перемещение basicAck на более раннее значение будет иметь большое значение; на самом деле, при предварительной выборке по умолчанию (1) следующее сообщение, скорее всего, будет доступно раньше. Я предлагаю вам пересмотреть ваши изменения.   -  person Gary Russell    schedule 23.06.2017


Ответы (1)


Он должен работать нормально; Я только что провел быстрый тест, и он работает для меня...

@SpringBootApplication
public class So44666444Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So44666444Application.class, args).close();
    }

    @Autowired
    private RabbitTemplate template;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("foo", "bar");
        latch.await();
    }

    @Bean
    public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
        adapter.setOutputChannelName("ack");
        return adapter;
    }

    @Bean
    public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames("foo");
        return container;
    }

    @ServiceActivator(inputChannel = "ack")
    public void ack(@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag)
            throws IOException {
        System.out.println("Acking: " + tag);
        channel.basicAck(tag, false);
        latch.countDown();
    }

}

Если я установлю точку останова на basicAck, я увижу сообщение в консоли как снятое; переход на следующую строку, и сообщение удаляется.

person Gary Russell    schedule 21.06.2017
comment
Спасибо @Gary Russell. Я нашел ошибку в своем коде и заставил ее работать: загрузил рабочую конфигурацию в исходный пост. Есть ли способ сделать это более эффективным? (быстрее получать, использовать меньше ресурсов и т. д.) - person Praveen Kumar; 21.06.2017