Camel Route / ActiveMQ, демаршалинг JSON и отправка в методы

У меня есть ответ JSON, отправляемый в мою тему activemq. Я настроил свой маршрут, чтобы демаршалировать этот ответ в POJO. Затем я хочу последовательно пройти через 3 различных метода, которые используют поля в POJO, которые были заполнены объектом JSON.

В основном это то, что я пытаюсь сделать. Моя проблема заключается в том, что я все еще новичок в Camel, и я не уверен, что / как мне следует передавать информацию из маршрута в мои методы, чтобы я мог использовать POJO, который был заполнен значениями JSON. .

Если это вообще имеет смысл. Я собираюсь опубликовать свой Camel Route и класс Java ниже, я воздержусь от pojo, потому что он достаточно прост, всего 10 полей, которые различаются между int и string с некоторыми методами getter / setter.

EnrollResponse - это название POJO.

Любая помощь или руководство / советы по этому поводу были бы очень признательны!

ИЗМЕНИТЬ / ОБНОВИТЬ:

В консоли в eclipse я получаю Sys из моего метода «установки», поэтому я знаю, что он, по крайней мере, доходит до этого шага. В моих журналах ошибки начинаются именно здесь:

15: 38: 11,919 DEBUG [читать # 0 - JmsConsumer [Test.Central]] SendProcessor: >>>> Конечная точка [bean: // TriggeredSendBean? Method = setup] Exchange [JmsMessage [JmsMessageID: ID: LT_John-51650-1363715888983- 3: 2: 1: 1: 1]]

15: 38: 12,396 ИНФОРМАЦИЯ [читать # 0 - JmsConsumer [Test.Central]] ReflectionServiceFactoryBean: создание службы {http://testAPI.com/wsdl/partnerAPI} PartnerAPI из WSDL: файл: ресурсы / META-INF / framework.wsdl

15: 38: 13,159 DEBUG [read # 0 - JmsConsumer [Test.Central]] DefaultErrorHandler: Неудачная доставка для (MessageId: topic_Test.Central_ID_LT_John-51650-1363715888983-3_2

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
   http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans.xsd
   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<!-- load properties --> 

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations" value="file:backend.properties" /> 
</bean>
<bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent">
    <property name="location" value="file:backend.properties" /> 
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://0.0.0.0:61616?useLocalHost=true" /> 
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8" /> 
    <property name="maximumActive" value="500" /> 
    <property name="connectionFactory" ref="jmsConnectionFactory" /> 
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" /> 
</bean>

<!-- Custom Loaded Beans -->

<bean id="TriggeredSendBean" class="com.backend.trigger.ClientTest"/>
<bean id="EnrollResponse" class="com.testObjects.EnrollResponse" />

 <!-- camel configuration --> 

<camel:camelContext xmlns="http://camel.apache.org/schema/spring">
    <camel:dataFormats>
        <json id="UnmarshalToPOJO" library="Jackson" unmarshalTypeName="com.testObjects.EnrollResponse" />
     </camel:dataFormats>

    <camel:route id="genericMessageHandler" streamCache="true">
    <from uri="activemq:topic:Test.Central" /> 
    <unmarshal ref="UnmarshalToPOJO" /> 


    <to uri = "bean:TriggeredSendBean?method=setup" /> 
    <to uri = "bean:TriggeredSendBean?method=addSubscriberAllList" />
    <to uri = "bean:TriggeredSendBean?method=sendWelcomeEmail" />

    </camel:route>
</camel:camelContext>
1_1 на ExchangeId: ID-LT-John-52743-1361). 1). При попытке доставки: 0 поймано: org.apache.camel.CamelExecutionException: исключение произошло во время выполнения на обмене: Exchange [JmsMessage [JmsMessageID: ID: LT_John-51650-1363715888983-3: 2: 1: 1: 1]]

15: 38: 13,160 ОШИБКА [чтение # 0 - JmsConsumer [Test.Central]] DefaultErrorHandler: Неудачная доставка для (MessageId: topic_Test.Central_ID_LT_John-51650-1363715888983-3_2

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
   http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans.xsd
   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<!-- load properties --> 

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations" value="file:backend.properties" /> 
</bean>
<bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent">
    <property name="location" value="file:backend.properties" /> 
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://0.0.0.0:61616?useLocalHost=true" /> 
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8" /> 
    <property name="maximumActive" value="500" /> 
    <property name="connectionFactory" ref="jmsConnectionFactory" /> 
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" /> 
</bean>

<!-- Custom Loaded Beans -->

<bean id="TriggeredSendBean" class="com.backend.trigger.ClientTest"/>
<bean id="EnrollResponse" class="com.testObjects.EnrollResponse" />

 <!-- camel configuration --> 

<camel:camelContext xmlns="http://camel.apache.org/schema/spring">
    <camel:dataFormats>
        <json id="UnmarshalToPOJO" library="Jackson" unmarshalTypeName="com.testObjects.EnrollResponse" />
     </camel:dataFormats>

    <camel:route id="genericMessageHandler" streamCache="true">
    <from uri="activemq:topic:Test.Central" /> 
    <unmarshal ref="UnmarshalToPOJO" /> 


    <to uri = "bean:TriggeredSendBean?method=setup" /> 
    <to uri = "bean:TriggeredSendBean?method=addSubscriberAllList" />
    <to uri = "bean:TriggeredSendBean?method=sendWelcomeEmail" />

    </camel:route>
</camel:camelContext>
1_1 на ExchangeId: ID-LT-John-5218843-13658 1). Исчерпано после попытки доставки: 1 поймано: org.apache.camel.CamelExecutionException: исключение произошло во время выполнения на обмене: Exchange [JmsMessage [JmsMessageID: ID: LT_John-51650-1363715888983-3: 2: 1: 1: 1]]

org.apache.camel.CamelExecutionException: исключение произошло во время выполнения на обмене: Exchange [JmsMessage [JmsMessageID: ID: LT_John-51650-1363715888983-3: 2: 1: 1: 1]]


<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:camel="http://camel.apache.org/schema/spring"
   xsi:schemaLocation="
   http://www.springframework.org/schema/beans    http://www.springframework.org/schema/beans/spring-beans.xsd
   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<!-- load properties --> 

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations" value="file:backend.properties" /> 
</bean>
<bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent">
    <property name="location" value="file:backend.properties" /> 
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://0.0.0.0:61616?useLocalHost=true" /> 
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="maxConnections" value="8" /> 
    <property name="maximumActive" value="500" /> 
    <property name="connectionFactory" ref="jmsConnectionFactory" /> 
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig" /> 
</bean>

<!-- Custom Loaded Beans -->

<bean id="TriggeredSendBean" class="com.backend.trigger.ClientTest"/>
<bean id="EnrollResponse" class="com.testObjects.EnrollResponse" />

 <!-- camel configuration --> 

<camel:camelContext xmlns="http://camel.apache.org/schema/spring">
    <camel:dataFormats>
        <json id="UnmarshalToPOJO" library="Jackson" unmarshalTypeName="com.testObjects.EnrollResponse" />
     </camel:dataFormats>

    <camel:route id="genericMessageHandler" streamCache="true">
    <from uri="activemq:topic:Test.Central" /> 
    <unmarshal ref="UnmarshalToPOJO" /> 


    <to uri = "bean:TriggeredSendBean?method=setup" /> 
    <to uri = "bean:TriggeredSendBean?method=addSubscriberAllList" />
    <to uri = "bean:TriggeredSendBean?method=sendWelcomeEmail" />

    </camel:route>
</camel:camelContext>


public class ClientTest 
{
    static String user = null;
    static String password = null;
    static String customerKeyWelcomeEmailTest = null;
    static String validFromAddress = null;
    static String validFromName = null;

public static void setup(Exchange exchange) 
{   

    System.out.println("Exchange " + exchange.toString());

    //Retrieve settings from properties file
    Properties properties = getProperties();
    user = properties.getProperty("user");
    password = properties.getProperty("password");
    customerKeyWelcomeEmailTest = properties.getProperty("customerKeyWelcomeEmailTest");  
    validFromAddress = properties.getProperty("validFromAddress");
    validFromName = properties.getProperty("validFromName");


    System.out.println("user==> " + user);
    System.out.println("password==> " + password);
    System.out.println("customerKey==> " + customerKeyWelcomeEmailTest);
    System.out.println("validFromAddress==> " + validFromAddress);
    System.out.println("validFromName==> " + validFromName);

    //Create PartnerAPI stub.
    PartnerAPI service = new PartnerAPI();
    Soap stub = service.getSoap();
} 

/**
 * Adding specific subscriber to the "All subscribers" list in
 */

  private static void addSubscriberAllList(Soap stub, Exchange exchange, EnrollResponse enrollResponse) 
  {

      // Checking to see if enrollResponse is being populated
      System.out.println(enrollResponse.getEmail());
      System.out.println(enrollResponse.getFirstname());
      System.out.println(enrollResponse.getLastname());
      System.out.println(enrollResponse.getAcctid());
      System.out.println(enrollResponse.getCid());
      System.out.println(enrollResponse.getMyfridays());
      System.out.println(enrollResponse.getPhone());
      System.out.print(enrollResponse.getPoints());

      Subscriber subscriber = new Subscriber();

      subscriber.setEmailAddress(enrollResponse.getEmail());
      subscriber.setSubscriberKey(enrollResponse.getAcctid());  
      subscriber.setStatus(SubscriberStatus.ACTIVE);

      Attribute a1 = new Attribute();
      a1.setName("firstname");
      a1.setValue(enrollResponse.getFirstname());
      //Can add more attributes as needed

      Attribute[] AttributeLists = {a1};
      subscriber.getAttributes().addAll(Arrays.asList(AttributeLists));

      APIObject[] apiObjects = {subscriber};

      try
      {
          CreateRequest createRequest = new CreateRequest();
          createRequest.setOptions(new CreateOptions());
          createRequest.getObjects().addAll(Arrays.asList(apiObjects));

          CreateResponse createResponse = stub.create(createRequest);
          System.out.println("Subscriber created in all subscriber List: " + createResponse.getOverallStatus());
      } catch (Exception e)
      {
          e.printStackTrace();
      }
  } 

/**
 * Testing Triggered Send SPECIFIC SUBSCRIBER 
 */

private static void sendWelcomeEmail(Soap stub, Exchange exchange, EnrollResponse enrollResponse) 
{
    Subscriber[] testArray = new Subscriber[1];
    Owner ownerSubscriberValid = new Owner();


    System.out.println("****************** STARTING TRIGGERED SEND TEST ******************");

    //Specify TriggeredSendDefinition and initialize the TriggeredSend
    TriggeredSendDefinition triggeredSendDefinition = new TriggeredSendDefinition();
    triggeredSendDefinition.setCustomerKey(customerKeyWelcomeEmailTest); 
    TriggeredSend triggeredSend = new TriggeredSend();
    triggeredSend.setTriggeredSendDefinition(triggeredSendDefinition);

    //Create a valid Subscriber
    Subscriber subscriberValid = new Subscriber();
    subscriberValid.setEmailAddress(enrollResponse.getEmail()); 
    subscriberValid.setSubscriberKey(enrollResponse.getAcctid());
    ownerSubscriberValid.setFromAddress(validFromAddress);
    ownerSubscriberValid.setFromName(validFromName);
    subscriberValid.setOwner(ownerSubscriberValid);




    //Populate array of Subscribers
    testArray[0] = subscriberValid;
    java.util.List<Subscriber> list = Arrays.asList(testArray);        
    triggeredSend.getSubscribers().addAll( list ); 

    //Send the TriggeredSend using Create call
    try{
      CreateRequest createRequest = new CreateRequest();
      CreateOptions createOptions = new CreateOptions();
      createRequest.setOptions(createOptions);
      java.util.List<APIObject> listAPIObject = Arrays.asList(new APIObject[] {triggeredSend});
      createRequest.getObjects().addAll(listAPIObject);
      CreateResponse createResponse = stub.create(createRequest);

      System.out.println( "overall status message: " + createResponse.getOverallStatus() );
      java.util.List<CreateResult> listCreateResult = createResponse.getResults();
      CreateResult[] createResult = listCreateResult.toArray(new CreateResult[listCreateResult.size()]);

      for ( CreateResult status : createResult )
      {
          System.out.println("create status message: " + status.getStatusMessage());
          System.out.println("create status code: " + status.getStatusCode());
      }       

      //Validate the send and get failure information
      TriggeredSendCreateResult triggeredSendCreateResult = (TriggeredSendCreateResult)createResult[0];
      assert(triggeredSendCreateResult.getSubscriberFailures() != null);
      System.out.println("size of failures array: " + triggeredSendCreateResult.getSubscriberFailures().size());
      System.out.println("email address:     " + triggeredSendCreateResult.getSubscriberFailures().get(0).getSubscriber().getEmailAddress());
      System.out.println("error description: " + triggeredSendCreateResult.getSubscriberFailures().get(0).getErrorDescription());
      System.out.println("error code:        " + triggeredSendCreateResult.getSubscriberFailures().get(0).getErrorCode());
      assert( createResult != null );
      assert(createResult[0].getStatusMessage().equals("OK"));
      System.out.println( "****************** ENDING TRIGGERED SEND TEST ******************" );   
    } catch(Exception e) {
        e.printStackTrace();
    }
}    

/**
 * Retrieve properties file.
 * 
 */
public static Properties getProperties() 
{
    Properties properties = new Properties();
    try{
        FileInputStream fileInputStream = new FileInputStream("properties.xml");
        properties.loadFromXML(fileInputStream);
    } catch (Exception e){
        e.printStackTrace();
    }
    return properties;
} 

}


person parchambeau    schedule 19.03.2013    source источник


Ответы (1)


Camel имеет сильную интеграцию с компонентами, и на веб-сайте Camel есть много документации по этому поводу, например, некоторые из этих ссылок

Если у вас есть книга Camel in Action, то в главе 4 рассказывается все об использовании beans с Camel.

person Claus Ibsen    schedule 20.03.2013
comment
Спасибо, что просмотрели документы. Они были полезны. У меня был только один вопрос, когда я использую Jackson для демаршалинга сообщения, и у меня есть dataFormat, указывающий на мой базовый POJO. Будет ли он автоматически сопоставлять поля JSON с моими правильными полями POJO? Я просто не уверен, как работает лежащий в основе Джексон. - person parchambeau; 20.03.2013