Сохранять список сущностей с помощью ‹int-jpa: outbound-channel-adapter

Возможно, мне здесь что-то не хватает, но я не могу найти никаких документов о сохранении списка сущностей с использованием Spring Integration JPA Outbound Channel Adapter. У меня есть чрезвычайно простой сценарий, который должен опросить базу данных и скопировать новые данные в другую базу данных. Таким образом:

<int-jpa:inbound-channel-adapter channel="myChannel"
    entity-class="my.package.MyClass"
    entity-manager="mysqlEntityManager"
    auto-startup="true">
    <int:poller fixed-rate="5000">
        <int:transactional propagation="REQUIRED" transaction-manager="mysqlTransactionManager"/>
    </int:poller>
</int-jpa:inbound-channel-adapter>

<int:channel id="myChannel" />

<int-jpa:outbound-channel-adapter channel="myChannel" 
    entity-class="my.package.MyClass"
    entity-manager="hsqldbEntityManager">
    <int-jpa:transactional transaction-manager="hsqldbTransactionManager" />                                    
</int-jpa:outbound-channel-adapter>

inbound-channel-adapter возвращает List<MyClass>, а затем outbound-channel-adapter выводит трассировку стека ниже ...

Как мне настроить адаптер для приема списка объектов? Я не хочу устанавливать максимальное количество сообщений для каждого опроса во входящем адаптере / опросчике, так как я бы предпочел минимальные операции с базой данных.

Любая помощь приветствуется.

Документы Spring: http://static.springsource.org/spring-integration/docs/3.0.0.M1/reference/html/jpa.html

[task-scheduler-1] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.integration.MessageHandlingException: error occurred in message handler [org.springframework.integration.jpa.outbound.JpaOutboundGateway@165d2be]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:319)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    at $Proxy24.handleMessage(Unknown Source)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
    at org.springframework.integration.endpoint.AbstractTransactionSynchronizingPollingEndpoint.doPoll(AbstractTransactionSynchronizingPollingEndpoint.java:82)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:319)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    at $Proxy23.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:236)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: Unknown entity: java.util.ArrayList
    at org.hibernate.ejb.AbstractEntityManagerImpl.merge(AbstractEntityManagerImpl.java:692)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:240)
    at $Proxy18.merge(Unknown Source)
    at org.springframework.integration.jpa.core.DefaultJpaOperations.merge(DefaultJpaOperations.java:175)
    at org.springframework.integration.jpa.core.JpaExecutor.executeOutboundJpaOperation(JpaExecutor.java:223)
    at org.springframework.integration.jpa.outbound.JpaOutboundGateway.handleRequestMessage(JpaOutboundGateway.java:81)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    ... 48 more
Caused by: org.hibernate.MappingException: Unknown entity: java.util.ArrayList
    at org.hibernate.impl.SessionFactoryImpl.getEntityPersister(SessionFactoryImpl.java:693)
    at org.hibernate.impl.SessionImpl.getEntityPersister(SessionImpl.java:1485)
    at org.hibernate.event.def.DefaultMergeEventListener.onMerge(DefaultMergeEventListener.java:232)
    at org.hibernate.event.def.DefaultMergeEventListener.onMerge(DefaultMergeEventListener.java:84)
    at org.hibernate.impl.SessionImpl.fireMerge(SessionImpl.java:867)
    at org.hibernate.impl.SessionImpl.merge(SessionImpl.java:851)
    at org.hibernate.impl.SessionImpl.merge(SessionImpl.java:855)
    at org.hibernate.ejb.AbstractEntityManagerImpl.merge(AbstractEntityManagerImpl.java:686)
    ... 59 more

person DairyLea    schedule 23.05.2013    source источник


Ответы (2)


Исходящий адаптер JPA в настоящее время не поддерживает сохранение списка сущностей; не стесняйтесь открывать новый выпуск JIRA. Вы можете добавить простой <splitter/> перед исходящим адаптером (без атрибутов, только каналы ввода и вывода), и он разделит список на отдельные объекты.

Однако с вашей текущей конфигурацией каждое сохранение будет выполняться в отдельной транзакции.

Тем не менее, вы, вероятно, все равно захотите синхронизировать две транзакции, поэтому вам действительно следует запускать транзакцию hsql одновременно с транзакцией mysql, чтобы Spring приложил все усилия для синхронизации двух транзакций (зафиксируйте их как можно ближе друг к другу).

Для этого вам понадобится что-то вроде ChainedTransactionManager в " Best Efforts 1PC "отличной статьи Дэйва Сайера на эту тему.

Таким образом, каждая из разделенных сущностей будет зафиксирована в одной транзакции.

person Gary Russell    schedule 23.05.2013
comment
Спасибо за ваш ответ. Я думаю, мне нужно больше читать о транзакциях и менеджерах транзакций! На данный момент я только что направил список в POJO и продолжаю вручную ... Я открою заявку, если у меня будет время. Извините, не могу проголосовать за ваш ответ, так как у меня недостаточно репутации :( - person DairyLea; 23.05.2013

Я создал Jira, чтобы добиться этого улучшения:

https://jira.springsource.org/browse/INT-3029

В качестве временного решения вы также можете расширить DefaultJpaOperations (org.springframework.integration.jpa.core) и настроить метод persist (), позволяющий обрабатывать тип коллекции параметры. Подробнее см .:

Пакетные вставки с JPA / EJB3

Адаптеры имеют атрибут пространства имен XML jpa-operations, позволяющий указать ссылку на пользовательские реализации. Для получения дополнительных сведений см. http://static.springsource.org/spring-integration/docs/latest-ga/reference/html/jpa.html#jpa-namespace-support-common-attributes

person Gunnar Hillert    schedule 23.05.2013