Я использую весеннюю интеграцию для обработки файлов. Я хочу читать файл построчно, фильтровать, конвертировать и отправлять в кафку. Файл передается на обработку через асинхронный шлюз. В конце обработки я добавил агрегатор для двух целей:
- Я хочу дождаться обработки всего файла. То есть все правильно обработанные строки были отправлены в kafka, и все строки, вызвавшие ошибки, были обработаны потоком, который считывает из канала ошибок.
- В качестве возвращаемого значения для шлюза я хочу получать агрегированные результаты для обработки файлов. Сколько строк было прочитано из файла, сколько обработано без ошибок и т. Д.
То, что я сделал до сих пор, я создал агрегатор, который ожидает файловых маркеров START и END + все обработанные строки. Я создал настраиваемую группу сообщений, которая подсчитывает обработанные строки, а также хранит replyChannel
, которое берется из сообщения маркера START и счетчика строк, которое берется из сообщения маркера END файла. Для группы также установлен тайм-аут 5 секунд. Когда группа завершает работу, она выдает сообщение со статистикой, которую я хочу, и заголовок replyChannel
со значением, взятым из маркера START. Насколько я понимаю, шлюз ожидает ответа на этот replyChannel
. Вот код для группы сообщений:
public void add(Message<?> messageToAdd) {
if(messageToAdd.getPayload() instanceof FileMarker) {
FileMarker marker = (FileMarker) messageToAdd.getPayload();
switch (marker.getMark()) {
case START:
replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
break;
case END:
expectedRowCount.set(marker.getLineCount());
break;
default:
throw new IllegalStateException("Unexpected file mark");
}
} else {
ProcessingResult processingResult = messageToAdd.getHeaders()
.get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
assert processingResult != null;
rowCount.incrementAndGet();
switch (processingResult) {
case FILTERED:
filteredCount.incrementAndGet();
break;
case PROCESSED:
processedCount.incrementAndGet();
break;
case PROCESSING_ERROR:
processingErrorCount.incrementAndGet();
break;
case KAFKA_ERROR:
kafkaErrorCount.incrementAndGet();
break;
default:
throw new IllegalStateException("Unrecognized processing result: " + processingResult);
}
}
}
У меня проблема в том, что мой шлюз никогда не получает ответа и ждет бесконечно. Как заставить асинхронный шлюз ждать, пока агрегатор отправит свое сообщение и получит полезную нагрузку этого сообщения в результате обработки шлюзом?
Тест, воссоздающий проблему, можно найти здесь https://github.com/hawk1234/spring-integration-example фиксации 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4. При запуске тестов журналы приложения доступны по пути build / logs / spring-integration-example.log. В настоящее время тест зависает, поскольку шлюз не получает ответа. Кроме того, весь процесс находится в стадии разработки, поэтому группа освобождается только по истечении тайм-аута.