[EDIT] Загрузка полных конфигураций:
rabbit.xml, который удаляется из очереди Rabbit
<rabbit:connection-factory id="amqpConnectionFactoryInbound"
host="${rabbit.host}" port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" channel-
cache-size="5"
connection-factory="rabbitConnectionFactoryInbound"/>
<beans:bean id="rabbitConnectionFactoryInbound"
class="com.rabbitmq.client.ConnectionFactory">
<beans:property name="requestedHeartbeat"
value="60" />
</beans:bean>
<!-- Inbound Adapter to AMQP RabbitMq and write to file -->
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="8" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="25" />
<header-enricher input-channel="rabbitInboundMessageChannel" output-
channel="rabbitOutboundboundMessageChannel">
<int:header name="Operation" value="${operation.rabbit}" />
<int:header name="GUID" expression="#{
'T(java.util.UUID).randomUUID().toString()' }" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
</header-enricher>
<int:channel id="rabbitOutboundboundMessageChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS"
pool-size="10-30"
queue-capacity="25" />
</beans:beans>
Затем сообщение отправляется на канал маршрутизатора: router.xml
<int:header-enricher input-channel="rabbitOutboundboundMessageChannel"
output-channel="routerChannel">
<int:header name="Operation" value="${operation.router}"
overwrite="true" />
<int:header name="file_name" expression="headers['GUID'] + '.xml'" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }"
overwrite="true" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int:recipient-list-router id="rabbitMsgrouter" input-
channel="routerChannel">
<int:recipient channel="fileBackupChannel" selector-expression="new
String(payload).length()>0" />
<int:recipient channel="transformerChannel" />
</int:recipient-list-router>
<int:channel id="transformerChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="fileBackupChannel"/>
<int:channel id="loggerChannel"/>
</beans>
Сообщение теперь отправляется в файл persister.xml и transform.xml. Ниже приведен файл persister.xml, и я хочу подтвердить его успешность. Существуют и другие нижестоящие процессы после transform.xml
<int:header-enricher input-channel="fileBackupChannel" output-
channel="fileSaveChannel">
<int:header name="Operation" value="${operation.filePersister}"
overwrite="true" />
<int:header name="replyChannel" value="nullChannel" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/>
<int:service-activator input-channel="rabbitAckChannel" output-
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" />
<bean id="ackRabbit"
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/>
<int:channel id="rabbitAckChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="loggerChannel"/>
<int:channel id="fileSaveChannel"/>
</beans>
У меня возникли проблемы с ручным получением полезной нагрузки от rabbitmq.
Это мой рабочий процесс:
<сильный>1. Получить сообщение от кролика с помощью адаптера входящего канала:
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="${rabbit.concurrentConsumers}" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="${rabbit.prefetchCount}" />
<сильный>2. Сохранять сообщение на диск с помощью исходящего шлюза:
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="loggerChannel" />
<сильный>3. подтвердите от кролика, когда персистент (шаг 2) завершится успешно.
для шага (3) я написал следующий код:
public class RabbitAcknowledgement {
public void handleRabbitAcks(Message<?> message) throws IOException {
com.rabbitmq.client.Channel channel = (Channel)
message.getHeaders().get("amqp_channel");
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
channel.basicAck(deliveryTag, false);
}
который я звоню с весны через:
<int:service-activator input-
channel="rabbitOutboundboundMessageChannel" output-
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" />
Это не работает, и полезные данные кролика в моей очереди не подтверждены.
Мои вопросы:
- Нужна ли мне РУЧНАЯ проверка в этом сценарии?
- Что я делаю не так?
handleRabbitAcks
возвращает void; Я не вижу, как можно выполнять какую-либо дополнительную работу после того, как она вызвана. - person Gary Russell   schedule 21.06.2017basicAck()
, не имел бы никакого значения. Возможно, вам нужно увеличитьprefetchCount
, но, опять же, не должно иметь значения, кто делает acks. Если вы используете режим AUTO ack и увеличиваетеtxSize
, контейнер будет отправлять подтверждение каждыеtxSize
сообщения (но увеличится вероятность повторной доставки). - person Gary Russell   schedule 23.06.2017basicAck
на более раннее значение будет иметь большое значение; на самом деле, при предварительной выборке по умолчанию (1) следующее сообщение, скорее всего, будет доступно раньше. Я предлагаю вам пересмотреть ваши изменения. - person Gary Russell   schedule 23.06.2017