Интеграция Spring заставляет асинхронный шлюз ждать результата агрегации

Я использую весеннюю интеграцию для обработки файлов. Я хочу читать файл построчно, фильтровать, конвертировать и отправлять в кафку. Файл передается на обработку через асинхронный шлюз. В конце обработки я добавил агрегатор для двух целей:

  • Я хочу дождаться обработки всего файла. То есть все правильно обработанные строки были отправлены в kafka, и все строки, вызвавшие ошибки, были обработаны потоком, который считывает из канала ошибок.
  • В качестве возвращаемого значения для шлюза я хочу получать агрегированные результаты для обработки файлов. Сколько строк было прочитано из файла, сколько обработано без ошибок и т. Д.

То, что я сделал до сих пор, я создал агрегатор, который ожидает файловых маркеров START и END + все обработанные строки. Я создал настраиваемую группу сообщений, которая подсчитывает обработанные строки, а также хранит replyChannel, которое берется из сообщения маркера START и счетчика строк, которое берется из сообщения маркера END файла. Для группы также установлен тайм-аут 5 секунд. Когда группа завершает работу, она выдает сообщение со статистикой, которую я хочу, и заголовок replyChannel со значением, взятым из маркера START. Насколько я понимаю, шлюз ожидает ответа на этот replyChannel. Вот код для группы сообщений:

public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

У меня проблема в том, что мой шлюз никогда не получает ответа и ждет бесконечно. Как заставить асинхронный шлюз ждать, пока агрегатор отправит свое сообщение и получит полезную нагрузку этого сообщения в результате обработки шлюзом?

Тест, воссоздающий проблему, можно найти здесь https://github.com/hawk1234/spring-integration-example фиксации 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4. При запуске тестов журналы приложения доступны по пути build / logs / spring-integration-example.log. В настоящее время тест зависает, поскольку шлюз не получает ответа. Кроме того, весь процесс находится в стадии разработки, поэтому группа освобождается только по истечении тайм-аута.


person MAREK    schedule 20.03.2020    source источник


Ответы (2)


Я создал настраиваемую группу сообщений, которая

Что-то не так.

Вы должны выполнить всю необходимую логику, когда у вас уже есть все сообщения (включая FileMarker) в группе, стандартной группе. Есть MessageGroupProcessor, который 3 вызывается, когда ReleaseStrategy возвращает true. Итак, здесь у вас есть все сообщения для группы, и вы можете выполнить сбор всей необходимой статистики для получения желаемого ответа.

Ваша заявка слишком сложна, чтобы ее можно было быстро усвоить. Я вижу, что вы не отправляете ошибки агрегатору. И он должен быть где-то в верхнем потоке, а не только в FileMarker подпотоке. Я имею в виду, что было бы лучше иметь агрегатор в отдельном потоке верхнего уровня и действительно отправлять FileMarker непосредственно этому, а остальное от sendSuccessChannel(), а также после ошибок обработки.

person Artem Bilan    schedule 20.03.2020
comment
Объявление 1. Что касается MessageGroupProcessor, я знаю, что тут немного хакаю. В документации четко указано, что я должен хранить только сообщения в группе, но я не могу этого сделать, потому что не хочу хранить в памяти все содержимое файла. Мне кажется, что этот подход работает, но, может быть, в некоторых случаях это может вызвать ошибки? Объявление 2. Я вижу, что вы не отправляете ошибки агрегатору Верно, поток все еще продолжается, но здесь работает правило тайм-аута, и агрегированное сообщение может быть отправлено на шлюз. - person MAREK; 20.03.2020
comment
Что ж, вы можете просто преобразовать каждую строку success или failure для сбора будущей статистики. - person Artem Bilan; 20.03.2020
comment
Хорошее замечание, я учту это. Однозначно, в таком случае мне не пришлось бы писать собственный MessageGroup и изменять MessageGroupStore - более чистый код. Хотя считаю, что подсчет результата «на лету» должен быть более эффективным. Возможно, мне даже не нужно использовать AtomicLong, поскольку SimpleMessageStore синхронизирует операции в группе. - person MAREK; 20.03.2020

Мне удалось исправить проблему. Мне пришлось вручную отправить агрегированное сообщение на MessageHeaders.REPLY_CHANNEL, для этого я использовал метод BaseIntegrationFlowDefinition#logAndReply. Исправление, как и готовый пример, доступно с фиксацией eefd5f472891ded39f363ff71ec530ada800f704.

person MAREK    schedule 23.03.2020