spring xd теряет сообщения при обработке огромного объема

Я использую spring xd. Мой поток выглядит так, как показано ниже, и запускаю тесты на контейнере с 3 узлами с 1 узлом администратора с кроликом в качестве транспорта.

aws-s3|processor1|http-client|processor2>queue:readyQueue

Я создал внизу кран.

tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

В своих тестах я запускаю следующие сценарии:

Scenario1: 5 файлов по 200 КБ = 1 миллион записей, параллелизм http-client = 70 и процессор2 = 30

Я вижу 900k сообщения s3Queue

Я вижу 889 тыс. процессоров сообщений Queue1

Я вижу сообщение 886k httpQueue

Я вижу 883k сообщений обработчика сообщенийQueue2 Сообщения теряются везде и их случайные

Scenario2:

5 файлов по 200 тыс. = 1 миллион записей и параллелизм всех модулей = 1

Я вижу 998800 сообщений s3Queue

Я вижу 998760 сообщений обработчика очереди1

Я вижу сообщение 997540 httpQueue

Я вижу 997530 сообщений обработчика очереди2

Даже это число является случайным и непоследовательным

Scenario3

Я изменил поток, как показано ниже, и параллелизм = 1 и 5 файлов по 200 тыс. = 1 миллион записей.

aws-s3 >testQueue

Я получаю все свои сообщения, которые я запускаю 3 раза, и никаких проблем. Я получаю все свои 1 миллион сообщений.

scenario4

Я изменил поток, как показано ниже, и параллелизм = 1 5 файлов по 200 тыс. = 1 миллион записей.

aws-s3 |processor1 >testQueue2

Я получаю все свои сообщения, которые я запускаю 3 раза, и никаких проблем. Я получаю все свои 1 миллион сообщений.

В сценарии 4 и сценарии 3 прием данных происходит быстрее, и для обработки 5 миллионов требуется 5 минут, а прием данных в очереди транспорта кролика был быстрее, например 5 тыс. Сообщений в секунду.

В сценарии 1 прием данных был медленнее, даже модуль s3 извлекал данные очень медленно, например, от 300 до 1000 msg в секунду.

В сценарии 2 s3 извлекал данные быстрее, но HTTP-клиент был медленным, например, 100 msg в секунду, но aws-s3 извлекал данные быстро, как 3-4k msg в секунду.

Я думаю, что просмотр xd threading вызывает проблемы, и я теряю сообщения. Пожалуйста, не могли бы вы помочь мне, как решить эту проблему.

Обновить

Scenario 5 

Я изменил reply-timeout на -1 в http-клиенте, а затем потерял только 37 сообщений.

Теперь снова я запускаю 2-ю итерацию, я потерял 25000 сообщений, я вижу журнал ревущих контейнеров, когда это произошло

2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~                                                                                                                                                                                 


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

Обновлено

Я обнаружил проблему потери сообщений, когда происходит это исключение, я вижу много потерянных сообщений. Этот шаблон я тестировал несколько раз. Каждый раз, когда это исключение происходит, я вижу потерянные сообщения. Кроме того, из-за параллелизма эта проблема возникает часто.

2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

конфигурация кролика

spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: [email protected],[email protected],[email protected]
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

обновлено с увеличением размера кеша до 200

Я добавил xml, предоставленный вами, и увеличил размер кеша до 200. Это происходит при обработке 1 миллиона и 80 тыс. сообщений. Только мой HTTP-клиент параллелизма равен 100, все остальные равны 1. Медленная обработка остановленных сообщений все еще существует до http-клиента. очередь и тот же счетчик. Но количество сообщений в моем именованном канале медленно увеличивается, например, 10 сообщений в минуту, но это очень медленный s3-poller|processor|http-client>queue:batchCacheQueue

Сообщения не уменьшаются в очереди до http 186174. Но медленно msg поступают в batchCacheQueue

введите здесь описание изображения

Тестовый пример для имитации:

1) Я использовал источник Spring Integration aws-s3 с разделителем в композитном модуле | процессор, подобный синтаксическому анализу xml | http-клиент с параллелизмом 100 > именованный канал.

2) Я думаю, что источник файла также может работать. Создайте один файл из миллиона записей и попробуйте извлечь его из файла.

3) После запуска с 4 по 5 мы видим, что это исключение происходит


person constantlearner    schedule 04.03.2016    source источник


Ответы (1)


Вызвано: com.rabbitmq.client.impl.UnknownChannelException: неизвестный номер канала 23364

Обнаружена проблема, когда каналы сильно перегружаются; вам нужно увеличить размер кэша канала в фабрике соединений кэширования кролика.

См. этот ответ для обходного пути.

Я открыл задачу JIRA, чтобы следующая версия Spring XD предоставила этот параметр в server.yml, поэтому вам не нужно переопределять файл конфигурации шины.

person Gary Russell    schedule 05.03.2016
comment
Спасибо, Гэри. Я обнаружил проблему с потерей сообщений, пожалуйста, посмотрите мое обновление. Дайте мне знать, что это работа, которую мы должны решить. - person constantlearner; 06.03.2016
comment
Да; увеличение размера кеша должно решить эту проблему; В настоящее время я не знаю основной причины исключения неизвестного канала (мне не удалось воспроизвести его в тестах), но увеличение размера кеша позволяет избежать этого. - person Gary Russell; 06.03.2016
comment
Я обновил это, и сообщения задерживаются, пожалуйста, посмотрите мое обновление - person constantlearner; 07.03.2016
comment
Поскольку вы не используете именованный канал, возможно, у вас заканчиваются ресурсы на сервере кролика. Я вижу, что очередь находится в flow. Может быть, посмотреть в журналах кролика? - person Gary Russell; 08.03.2016
comment
Я снова проводил аналогичный тест с 200 тыс. записей, извлекаемыми из s3 каждые 10 минут вместо 200 тыс. каждые 5 секунд. Теперь я не вижу этой проблемы. Добавление задержки в приеме помогло. Но также я изменил параллелизм http-клиента на 10 со 100. Но и здесь я не использую именованный канал. Также в кролике говорится, что рабы не синхронизированы и не могут удалить или очистить этот именованный канал. - person constantlearner; 08.03.2016
comment
Также я вижу, что github.com/spring-projects/spring-amqp/blob/ увеличить кеш канала для высокой параллелизма? Было ли это причиной - person constantlearner; 08.03.2016
comment
Несколько человек сообщили о UnknownChannelException. Мы не отследили основную причину, но кажется, что увеличение размера кеша — это обходной путь. Я не совсем понимаю, есть ли у вас проблема, но я не могу помочь только со статическими данными, которые вы предоставляете. Если вы можете воспроизвести свою проблему с помощью простого тестового примера, мы можем посмотреть. - person Gary Russell; 08.03.2016
comment
Обязательно добавляйте тестовый пример - person constantlearner; 08.03.2016
comment
Увеличение размера кеша помогает и не видит проблемы. Но как установить размер кеша? Если я использую больше параллелизма, должен ли я использовать больший размер кеша? Если я использую больше модулей или больше потоков, должен ли я увеличить размер кеша? - person constantlearner; 08.03.2016
comment
Проверка каналов для публикации сообщений очень короткая — трудно дать рекомендации, не зная больше о вашем приложении. Как правило, не помешает увидеть его на высоком уровне — мы создадим столько каналов, сколько необходимо, но обычно это будет намного меньше, чем общее количество потоков, публикующих сообщения. В версии 1.6 мы добавили информацию о кеше в JMX MBean, чтобы вы могли отслеживать кеш. - person Gary Russell; 08.03.2016
comment
Я столкнулся с аналогичной проблемой, например, очередь перешла в состояние потока, а системный процесс через некоторое время стал очень медленным. У нас много потоков в производстве, например 10 потоков, я думаю, установить размер кеша на 1000. Это нормально или это вызовет другие проблемы с производительностью? Просто ищу ваше мнение. - person constantlearner; 09.03.2016
comment
Это не должно быть проблемой, но вам нужно выяснить, почему ваш кролик замедляется. - person Gary Russell; 09.03.2016