Добавить возможности пакетной обработки в поток интеграции

В настоящее время у меня есть поток интеграции Spring, который отлично работает (см. Ссылку на диаграмму). Я хотел бы добавить пакетную обработку поверх моей текущей конфигурации, чтобы разрешить повторную попытку с экспоненциальным откатом, шаблоном автоматического выключателя и сохранением заданий в базе данных для перезапуска.

Поток интеграции состоит из шлюза, который принимает сообщение ‹MyObj›, которое в конечном итоге направляется к преобразователю, который преобразует сообщение ‹MyObj› в сообщение ‹String›. Затем агрегатор принимает сообщение ‹String› и в конечном итоге выпускает объединенное сообщение ‹String› (с использованием как стратегии выпуска размера, так и MessageGroupStoreReaper с тайм-аутом). Объединенная строка является полезной нагрузкой файла, загруженного с помощью адаптера исходящего канала SFTP.

Я искал, читал документы, просмотрел множество примеров и не могу понять, как инкапсулировать последний шаг процесса в пакетное задание. Мне нужна возможность повторить загрузку строки (в качестве полезной нагрузки файла), если во время загрузки возникла проблема с подключением SFTP или другое исключение. Я также хочу иметь возможность перезапустить (используя JobRepository с поддержкой базы данных) в случае некоторого сбоя, поэтому я не думаю, что использования Retry Advice достаточно.

Пожалуйста, объясните и помогите мне понять, как соединить части и что использовать (шлюз для запуска задания, преобразователь MessageToJobRequest, ItemReader, ItemWriter ??). Я также не уверен, как получить доступ к каждому сообщению ‹String› и отправить его на адаптер канала SFTP внутри задания, шага или тасклета.

Текущий поток: http://i.stack.imgur.com/GnurV.png


person Zach Z    schedule 09.09.2014    source источник


Ответы (1)


Прежде всего, давайте посмотрим, как мы можем преодолеть ваши требования без пакетной обработки.

  1. <int-sftp:outbound-channel-adapter> имеет <request-handler-advice-chain>, где вы можете настроить RequestHandlerRetryAdvice и RequestHandlerCircuitBreakerAdvice.

  2. Для достижения опции restartable вы можете сделать входной канал этого адаптера постоянной очередью с message-store.

Теперь о Batch.

Чтобы запустить задание из потока интеграции, вы должны написать несколько MessageToJobRequest и после этого использовать <batch-int:job-launching-gateway>. Здесь, конечно, вы можете поместить свой payload в jobParameters.

Чтобы отправить сообщение от Job на какой-либо канал (например, в sftp-адаптер), вы можете использовать org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter.

Подробнее читайте здесь: http://docs.spring.io/spring-batch/reference/html/springBatchIntegration.html

person Artem Bilan    schedule 10.09.2014