Драйвер RabbitMQ С# перестает получать сообщения

Есть ли у вас какие-либо указания, как определить, когда возникла проблема с подпиской, чтобы я мог повторно подключиться?

Мой сервис использует RabbitMQ.Client.MessagePatterns.Subscription для своей подписки. Через некоторое время мой клиент молча перестает получать сообщения. Я подозреваю проблемы с сетью, так как наше VPN-соединение не самое надежное.

Я некоторое время читал документы в поисках ключа, чтобы узнать, когда эта подписка может быть нарушена из-за проблемы с сетью, но без особой удачи. Я пытался проверить, что соединение и канал все еще открыты, но всегда кажется, что он все еще открыт.

Сообщения, которые он обрабатывает, работают довольно хорошо и подтверждаются обратно в очередь, поэтому я не думаю, что это проблема с «подтверждением».

Я уверен, что мне просто не хватает чего-то простого, но я еще не нашел его.

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

ОБНОВИТЬ:

Я смоделировал сбой сети, запустив сервер на виртуальной машине, и я действительно получаю исключение (RabbitMQ.Client.Exceptions.OperationInterruptedException: операция AMQP была прервана), когда я разрываю соединение на достаточно долгое время, поэтому возможно дело не в сети. Теперь я не знаю, что бы это было, но это выходит из строя всего через пару часов работы.


person Aaron Milner    schedule 19.09.2012    source источник


Ответы (1)


РЕДАКТИРОВАТЬ: Поскольку я все еще получаю положительные отзывы по этому поводу, я должен указать, что клиент .NET RabbitMQ теперь имеет встроенную функциональность: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

В идеале вы должны иметь возможность использовать это и избегать ручной реализации логики повторного подключения.


Недавно мне пришлось реализовать почти то же самое. Из того, что я могу сказать, большая часть доступной информации о RabbitMQ предполагает, что либо ваша сеть очень надежна, либо вы запускаете брокера RabbitMQ на том же компьютере, что и любой клиент, отправляющий или получающий сообщения, что позволяет Rabbit решать любые проблемы с подключением.

На самом деле не так сложно настроить клиент Rabbit, чтобы он был устойчивым к обрывам соединений, но есть несколько особенностей, с которыми вам нужно иметь дело.

Первое, что вам нужно сделать, это включить сердцебиение:

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

Установка «RequestedHeartbeat» на 30 заставит клиент каждые 30 секунд проверять, живо ли соединение. Если эта функция не включена, подписчик сообщения будет счастливо ждать прихода другого сообщения, не подозревая, что его соединение испортилось.

Включение пульса также заставляет сервер проверять, работает ли соединение, что может быть очень важно. Если соединение разорвано после того, как сообщение было получено подписчиком, но до того, как оно было подтверждено, сервер просто предполагает, что клиенту требуется много времени, и сообщение «застревает» в мертвом соединении, пока оно не будет закрыто. Когда пульсация включена, сервер распознает, когда соединение ухудшается, и закрывает его, помещая сообщение обратно в очередь, чтобы другой подписчик мог его обработать. Без пульса мне пришлось войти вручную и закрыть соединение в пользовательском интерфейсе управления Rabbit, чтобы застрявшее сообщение могло быть передано подписчику.

Во-вторых, вам нужно будет обработать OperationInterruptedException. Как вы заметили, это обычно исключение, которое клиент Rabbit выдает, когда замечает, что соединение было прервано. Если IModel.QueueDeclare() вызывается, когда соединение было прервано, вы получите это исключение. Обработайте это исключение, избавившись от своей подписки, канала и соединения и создав новые.

Наконец, вам придется обрабатывать то, что делает ваш потребитель при попытке использовать сообщения из закрытого соединения. К сожалению, каждый способ использования сообщений из очереди в клиенте Rabbit, по-видимому, реагирует по-разному. QueueingBasicConsumer выдает EndOfStreamException, если вы вызываете QueueingBasicConsumer.Queue.Dequeue при закрытом соединении. EventingBasicConsumer ничего не делает, так как просто ждет сообщения. Из того, что я могу сказать, попробовав это, класс Subscription, который вы используете, похоже, возвращает true из вызова Subscription.Next, но значение args равно null. Еще раз, справьтесь с этим, избавившись от вашего соединения, канала и подписки и воссоздав их.

Значение connection.IsOpen будет обновлено до False при сбое соединения с включенным пульсом, поэтому вы можете проверить это, если хотите. Однако, так как пульсация выполняется в отдельном потоке, вам все равно придется обрабатывать случай, когда соединение открыто, когда вы его проверяете, но закрывается до вызова subscription.Next().

И последнее, на что следует обратить внимание, это IConnection.Dispose(). Этот вызов вызовет EndOfStreamException, если вы вызываете dispose после того, как соединение было закрыто. Мне это кажется ошибкой, и мне не нравится не вызывать dispose для объекта IDisposable, поэтому я вызываю его и проглатываю исключение.

Собираем все вместе в быстром и грязном примере:

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}
person Brian Zell    schedule 04.10.2012
comment
Вот это да. Это выглядит великолепно. Сегодня утром я закодировал свой сервис и развернул его. Вы сэкономили мне массу времени. - person Aaron Milner; 09.10.2012