Добавете пакетни възможности към интеграционния поток

В момента имам пролетен интеграционен поток, който работи добре (вижте връзката за диаграма). Бих искал да добавя партида към текущата си конфигурация, за да позволя повторен опит с експоненциално забавяне, схема на прекъсвач и продължаващи задания към базата данни за рестартиране.

Интеграционният поток се състои от шлюз, който приема съобщение‹MyObj›, което в крайна сметка се насочва към трансформатор, който преобразува съобщение‹MyObj› в съобщение‹низ›. След това агрегаторът взема Message‹String› и в крайна сметка освобождава конкатениран Message‹String› (използвайки както стратегия за освобождаване на размера, така и MessageGroupStoreReaper с таймаут). След това свързаният низ е полезният товар на файла, качен чрез SFTP адаптер за изходящ канал.

Търсих, прочетох документи, разгледах тонове примери и не мога да разбера как да капсулирам последната стъпка от процеса в пакетно задание. Имам нужда от възможността да опитам отново да кача низа (като полезен товар на файла), ако има проблем с SFTP връзката или друго изключение, възникнало по време на качването. Също така искам да мога да рестартирам (използвайки JobRepository, поддържан от база данни) в случай на някакъв отказ, така че не мисля, че използването на Retry Advice е достатъчно.

Моля, обяснете и ми помогнете да разбера как да свържа заедно частите и кои да използвам (job-launching-gateway, MessageToJobRequest Transformer, ItemReader, ItemWriter??). Също така не съм сигурен как да осъществя достъп до всяко съобщение‹низ› и да го изпратя до SFTP канален адаптер вътре в задача, стъпка или задача.

Текущ поток: http://i.stack.imgur.com/GnurV.png


person Zach Z    schedule 09.09.2014    source източник


Отговори (1)


Първо нека да разгледаме как можем да преодолеем вашите изисквания без Batch.

  1. <int-sftp:outbound-channel-adapter> има <request-handler-advice-chain>, където можете да конфигурирате RequestHandlerRetryAdvice и RequestHandlerCircuitBreakerAdvice.

  2. За да постигнете опцията restartable, можете да направите входния канал на този адаптер като постоянна опашка с message-store.

Сега относно партидата.

За да стартирате задание от интеграционния поток, трябва да напишете малко MessageToJobRequest и да използвате <batch-int:job-launching-gateway> след това. Тук, разбира се, можете да поставите вашите payload към jobParameters.

За да изпратите съобщение от Job до някакъв канал (напр. към sftp адаптер), можете да използвате org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter.

Прочетете повече тук: http://docs.spring.io/spring-batch/reference/html/springBatchIntegration.html

person Artem Bilan    schedule 10.09.2014