Отправить MessageProperties [priority=anyInteger] при публикации сообщения в RabbitMQ

мы используем Rabbit MQ и Spring Integration в нашем проекте. Каждое сообщение имеет режим доставки, заголовок, свойства и часть полезной нагрузки. Мы хотим добавить свойства, т. е. приоритет со значением 2 (любое целое число), полезную нагрузку с «тестовым сообщением 3» и опубликовать сообщение в очереди с именем OES. пожалуйста, смотрите снимок экрана.

Добавление свойств[priority=3] в сообщение rabbitmq

Как добавить свойства сообщения, т.е. приоритет = 2 (или любое значение) в нижеприведенном адаптере исходящего канала (интеграция Spring). Я знаю, что мы можем добавить «заголовки», добавив в «сопоставленные заголовки запроса», но я хотел бы добавить свойства. Для MessageProperties в «адаптере исходящего канала» не определены свойства. Есть ли способ преодолеть эту проблему.

У нас нет проблем с полезной нагрузкой, она уже идет. мы хотим добавить только MessageProperties с приоритетом = 2 (любое значение). как добавить это в адаптер исходящего канала (нет необходимости в жестком кодировании, он должен быть универсальным)?

<!-- the mapped-request-headers should be symmetric with 
     the list on the consumer side defined in consumerbeans.consumerHeaderMapper() -->
<int-amqp:outbound-channel-adapter id="publishingAmqpAdapter" 
    channel="producer-processed-event-channel" 
    amqp-template="amqpPublishingTemplate"
    exchange-name="events_forwarding_exchange"
    routing-key-expression="headers['routing-path']"
    mapped-request-headers="X-CallerIdentity,routing-path,content-type,route_to*,event-type,compression-state,STANDARD_REQUEST_HEADERS"
/>

Другая конфигурация:

<!-- chain routes and transforms the ApplicationEvent into a json string -->
<int:chain id="routingAndTransforming"
    input-channel="producer-inbound-event-channel"
    output-channel="producer-routed-event-channel">
    <int:transformer ref="outboundMessageTracker"/>
    <int:transformer ref="messagePropertiesTransformer"/>
    <int:transformer ref="eventRouter"/>
    <int:transformer ref="eventToJsonTransformer"/>
</int:chain>

<int:transformer id="messagePayloadCompressor" 
   input-channel="compress-message-payload" 
   output-channel="producer-processed-event-channel"
   ref="payloadCompressor"/>

@Configuration("amqpProducerBeans")
@ImportResource(value = "classpath:com/apple/store/platform/events/si/event-producer-flow.xml")
public class AmqpProducerBeans {

    @Bean(name = { "amqpPublishingTemplate" })
        public AmqpTemplate amqpTemplate() {
            logger.debug("creating amqp publishing template");
            RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
            SimpleMessageConverter converter = new SimpleMessageConverter();
            // following needed for retry logic
            converter.setCreateMessageIds(true);
            rabbitTemplate.setMessageConverter(converter);
            return rabbitTemplate;
        }

/*Other code commented */

}

Другой код:

import org.springframework.integration.Message;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.message.GenericMessage;

public class PayloadCompressor {

    @Transformer
    public Message<byte[]> compress(Message<String> message){
        /* some code commented */

        Map<String, Object> headers = new HashMap<String, Object>();
        headers.putAll(message.getHeaders());
        headers.remove("compression-state");
        headers.put("compression-state", CompressionState.COMPRESSED);
        Message<byte[]> compressedMessage = new GenericMessage<byte[]>(compressedPayload, headers);
      return compressedMessage;


    }

Если мы не используем интеграцию Spring, мы можем использовать channel.basicPublish ниже и отправить MessageProperties.

ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("10.102.175.30");
factory.setUsername("rahul");
factory.setPassword("rahul");
factory.setPort(5672);
Connection connection = factory.newConnection();
System.out.println("got connection "+connection);
Channel channel = connection.createChannel();
MessageProperties msgproperties= new MessageProperties() ;
MessageProperties.BASIC.setPriority(3);
// set Messageproperties with priority
    String exchangeName = "HeaderExchange";
      String routingKey = "testkey";
      //routingkey
      byte[] messageBodyBytes = "Message having priority value 3".getBytes();
      channel.basicPublish(exchangeName,
                           routingKey,
                           true,
                           msgproperties.BASIC,
                           messageBodyBytes);

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.


person Raghu    schedule 21.04.2015    source источник


Ответы (1)


Свойства уже сопоставляются автоматически — см. сопоставитель заголовков.

Просто используйте <header-enricher/>, чтобы установить соответствующий заголовок, и он будет сопоставлен с правильным свойством. В случае приоритета константа имеет вид здесь для специфичных для amqp констант заголовка, см. здесь.

person Gary Russell    schedule 21.04.2015
comment
Это сработало. Гэри Рассел, ты потрясающий. большое спасибо. - person Raghu; 24.04.2015