Как обрабатывать более 10 одновременных сообщений из очереди AWS SQS FiFo с помощью Spring Integration

Я хочу иметь возможность обрабатывать более 10 сообщений SQS одновременно с помощью Spring Integration Workflow.

Исходя из этого вопроса, было рекомендовано использовать ExecutorChannel. Я обновил свой код, но симптомы остались прежними.

Как выполнить поток интеграции Spring в несколько потоков для параллельного использования большего количества сообщений очереди Amazon SQS?

После этого обновления мое приложение запрашивает 10 сообщений, обрабатывает их, и только после того, как я вызову amazonSQSClient.deleteMessage ближе к концу потока, оно примет еще 10 сообщений из очереди SQS.

Приложение использует очередь SQS FiFo.

Есть ли что-то еще, что мне не хватает, или это неизбежный симптом использования SqsMessageDeletionPolicy.NEVER с последующим удалением сообщений в конце потока? Принятие сообщений в начале потока на самом деле не вариант из-за других ограничений.

Вот соответствующие фрагменты кода с некоторыми упрощениями, но я надеюсь, что он выражает проблему.

Конфигурация очереди

@Bean
public AsyncTaskExecutor inputChannelTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    executor.setConcurrencyLimit(50);
    return executor;
}

@Bean
@Qualifier("inputChannel")
public ExecutorChannel inputChannel() {
    return new ExecutorChannel(inputChannelTaskExecutor());
}

Я также попробовал ThreadPoolTaskExecutor вместо SimpleAsyncTaskExecutor с тем же результатом, но я включу и его, если он предлагает другое понимание.

    @Bean
    public AsyncTaskExecutor inputChannelTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("spring-async-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.afterPropertiesSet();
        executor.initialize();
        return executor;
    }

Адаптер канала SQS

@Bean
public SqsMessageDrivenChannelAdapter changeQueueMessageAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSQSClient, changeQueue);
    adapter.setOutputChannel(inputChannel);
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    return adapter;
}


@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500, TimeUnit.MILLISECONDS).maxMessagesPerPoll(10);
}

Упрощенный основной поток

Обычный сценарий для нас - получить несколько правок Branch за короткий период времени. Этот поток "заботится" только о том, чтобы произошло хотя бы одно изменение. messageTransformer извлекает идентификатор из документа полезной нагрузки и помещает его в заголовок dsp_docId, который мы затем используем для агрегирования (мы используем этот идентификатор в нескольких других местах, поэтому мы почувствовал, что заголовок имеет смысл, а не выполняет всю работу в настраиваемом агрегаторе).

ProvisioningServiceActivator получает последнюю версию Branch, затем маршрутизатор решает, нужны ли ему дальнейшие преобразования (в этом случае он отправляет его в transformBranchChannel) или его можно отправить на наш экземпляр PI (через sendToPiChannel).

Поток преобразования (не показан, я не думаю, что он вам нужен) в конечном итоге приводит к отправке в поток PI, просто сначала он выполняет больше работы.

istingGroupProcessor захватывает все заголовки aws_receiptHandle и добавляет их в новый заголовок как | разделенный список.

Поток sendToPi (и errorFlow) заканчивается вызовом настраиваемого обработчика, который заботится об удалении всех сообщений SQS, на которые ссылается этот список строк aws_receiptHandle.

@Bean
IntegrationFlow sqsListener() {
    return IntegrationFlows.from(inputChannel)
                           .transform(messageTransformer)
                           .aggregate(a -> a.correlationExpression("1")
                                            .outputProcessor(listingGroupProcessor)
                                            .autoStartup(true)
                                            .correlationStrategy(message -> message.getHeaders().get("dsp_docId"))
                                            .groupTimeout(messageAggregateTimeout)  // currently 25s
                                            .expireGroupsUponCompletion(true)
                                            .sendPartialResultOnExpiry(true)
                                            .get())

                           .handle(provisioningServiceActivator, "handleStandard")
                           .route(Branch.class, branch -> (branch.isSuppressed() == null || !branch.isSuppressed()),
                                  routerSpec -> routerSpec.channelMapping(true, "transformBranchChannel")
                                                          .resolutionRequired(false)
                                                          .defaultOutputToParentFlow())

                           .channel(sendtoPiChannel)
                           .get();
}

comment
Можете ли вы сделать больше SqsMessageDrivenChannelAdapter с той же настройкой и отправкой на тот же канал? Я имею в виду, давайте попробуем следовать этой рекомендации: stackoverflow.com/questions/46377333/!   -  person Artem Bilan    schedule 14.09.2018
comment
Ага. С тремя экземплярами SqsMessageDrivenChannelAdapter, как определено выше (но с одним, двумя и тремя в конце имени компонента), он по-прежнему читает только 10 сообщений за раз. Меня это очень смущает, так как я думал, что это сработает, если будет немного неэлегантно.   -  person Steve    schedule 14.09.2018
comment
Итак, даже с 3 одновременными адаптерами вы все равно получаете максимум 10 сообщений от AWS SQS? Тогда это действительно их ограничение, пока мы не подтвердим (не удалим) эти извлеченные 10 сообщений ...   -  person Artem Bilan    schedule 14.09.2018
comment
Это начинает выглядеть так. Раздражающий. Спасибо за уделенное время. Ну что ж, есть выходные, чтобы обдумать план Б.   -  person Steve    schedule 14.09.2018


Ответы (1)


Я решил опубликовать это в качестве ответа, так как это решает мою проблему и может помочь другим. В качестве ответа скорее всего будет обнаружен ответ, а не изменение исходного вопроса, которое может остаться незамеченным.

Во-первых, я должен был заметить, что мы используем очередь FiFo.

Проблема была на самом деле дальше по цепочке, где мы устанавливали для MessageGroupId простое значение, описывающее источник данных. Это означало, что у нас были очень большие группы сообщений.

Из документации ReceiveMessage видно, что он вполне разумно останавливает в этом сценарии вы запрашиваете больше сообщений из этой группы, поскольку невозможно гарантировать порядок, если сообщение необходимо вернуть в очередь.

Обновление кода, отправляющего сообщение, для установки соответствующего MessageGroupId означало, что ExecutorChannel работал должным образом.

Хотя сообщения с определенным MessageGroupId невидимы, сообщения, принадлежащие тому же MessageGroupId, больше не возвращаются, пока не истечет время ожидания видимости. Вы по-прежнему можете получать сообщения с другим MessageGroupId, если он также отображается.

person Steve    schedule 17.09.2018