Очередь служебной шины Azure — QueueClient.Receive() возвращает значение null BrokeredMessage, когда сообщения находятся в очереди

У меня есть сообщения в очереди служебной шины Azure, которые я не могу получить. И я не получаю никаких указаний относительно того, в чем проблема. Я думаю, что это как-то связано с размером сообщения. Из приведенного ниже кода видно, что я использую OpenFileDialog. Я выбираю изображения в формате jpeg, и они отправляются в очередь. Теперь, когда я отправляю небольшие изображения размером менее 50 КБ, они нормально отображаются процессом получения, но более крупные изображения размером более 100 КБ просто остаются в очереди. MSDN говорит, что ограничение на размер сообщения составляет 256 КБ, поэтому я не уверен, что здесь происходит.

У меня два класса. Один из них — SendToQueue, а другой — RecvFromQueue. Вот код.

using System;
using System.IO;
using System.Windows.Forms;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;

namespace ServiceBusQueueApp
{
    public class SendToQueue
    {
        private const string c_testqueue = "TestQueue";

        [STAThreadAttribute]
        static void Main(string[] args)
        {
            // Configure Queue Settings
            QueueDescription qd = new QueueDescription(c_testqueue)
            {
                MaxSizeInMegabytes = 5120,
                DefaultMessageTimeToLive = new TimeSpan(1, 1, 0)
            };

            // Create a new Queue with custom settings
            string connectionString =
                CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(connectionString);

            if (!namespaceManager.QueueExists(c_testqueue))
            {
                namespaceManager.CreateQueue(qd);
            }

            namespaceManager.DeleteQueue(qd.Path);
            namespaceManager.CreateQueue(qd);

            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, c_testqueue);

            double maxSize = Math.Pow(2, 18);

            OpenFileDialog openFile = new OpenFileDialog();
            while (true)
            {
                if (openFile.ShowDialog() == DialogResult.Cancel)
                {
                    break;
                }

                var messageBodyStream = new FileStream(openFile.FileName, System.IO.FileMode.Open, FileAccess.Read, FileShare.ReadWrite);

                if (messageBodyStream.Length > maxSize)
                {
                    MessageBox.Show("File is larger than 256KB.");
                    continue;
                }
                BrokeredMessage msg =
                    new BrokeredMessage(messageBodyStream);
                msg.Properties["MyProperty"] = "Test Value";


                try
                {
                    //send msg to the queue
                    client.Send(msg);
                }
                catch (Exception exception)
                {
                    MessageBox.Show(exception.Message);
                    throw;
                }
            }

        }
    }
}


using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Windows.Forms;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;

namespace ServiceBusQueueApp
{
    class RecvFromQueue
    {

        private const string c_testqueue = "TestQueue";

        static void Main(string[] args)
        {
            // Create a new Queue with custom settings
            string connectionString =
                CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(connectionString);

            if (!namespaceManager.QueueExists(c_testqueue))
            {
                MessageBox.Show("Queue does not exist.");
                throw new Exception("Queue does not exist.");
            }

            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, c_testqueue);

            while (true)
            {
                BrokeredMessage message = client.Receive();

                if (message == null)
                {
                    continue;
                }
                try
                {
                    Stream fstream = message.GetBody<Stream>();
                    byte[] buffer = new byte[fstream.Length];
                    fstream.Read(buffer, 0, (int)fstream.Length);
                    File.WriteAllBytes(@"C:\users\roberthar\pictures\testpic.png", buffer);
                    fstream.Close();

                    Process paint = new Process();
                    paint.StartInfo.FileName = @"C:\Windows\System32\mspaint.exe";
                    paint.StartInfo.Arguments = @"C:\users\roberthar\pictures\testpic.png";
                    paint.Start();

                    Thread.Sleep(3000);

                    paint.Close();

                    // Remove message from queue
                    message.Complete();
                }
                catch (Exception exception)
                {
                    // Indicate a problem, unlock message in queue
                    message.Abandon();
                }
            }
        }
    }
}

person rharrison33    schedule 14.11.2014    source источник
comment
Извините, я понимаю, что это очень поздно, но я бы порекомендовал вам добавить какое-либо ведение журнала в код получения. Этот код перехватывает любое исключение (связанное с очередью или файлом), а затем отбрасывает сообщение без регистрации. Вы не увидите реальную ошибку (и, следовательно, не сможете ее исправить), пока не зарегистрируете сведения об исключении.   -  person Joon    schedule 04.04.2017


Ответы (2)


Ограничение на размер сообщения составляет 256 КБ, но оно включает как заголовки, так и тело сообщения, где максимальный размер заголовка составляет 64 КБ. В вашем случае заголовок не является проблемой (менее 1 КБ)

Я запускаю ваш пример с небольшими изменениями:

            string filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory,
                string.Format("testpic_{0}_{1}_{2}.png", now.Hour, now.Minute, now.Second));
            File.WriteAllBytes(filePath, buffer);
            fstream.Close();

            Process paint = new Process();
            paint.StartInfo.FileName = @"C:\Windows\System32\mspaint.exe";
            paint.StartInfo.Arguments = filePath;
            paint.Start();

И мне удалось успешно отправить и получить сообщение с помощью этот 254 Изображение в КБ

Если ваше сообщение будет слишком большим, вы получите MessageSizeExceededException когда вы звоните client.Send(msg);

Вы можете запустить этот тест в своей очереди, чтобы увидеть, можете ли вы получать все сообщения, для меня он проходит.

    [Fact]
    public void MaxMessageSize()
    {
        var sender = CreateClient();
        var reciver = CreateClient();
        for (int i = 1; i < 255; i++)
        {
            var size = i*1024;
            var buffer = new byte[size];
            random.NextBytes(buffer);
             BrokeredMessage msg =
               new BrokeredMessage(buffer);
            msg.Properties["size"] = size;
            sender.Send(msg);
            var message  = reciver.Receive();
            Assert.NotNull(message);
            Assert.Equal(message.Properties["size"], size);
            var bufferReceived = message.GetBody<byte[]>();
            Assert.Equal(buffer, bufferReceived);
            message.Complete();
        }
    }

Полная суть здесь

person b2zw2a    schedule 15.11.2014
comment
Используя предоставленную вами суть (с простым рефакторингом для запуска из консольного приложения), я не смог получать сообщения, размер которых превышал 64 КБ. Не удается отправить только сообщение размером › 256 КБ. - person BFreeman; 09.03.2015

Я наткнулся на этот вопрос, помогая коллеге. (Я обещаю, что это был другой разработчик!)

Мы столкнулись с этой проблемой, когда он запускал код, который проверял queue.MessageCount (устанавливая его в переменную с именем myQMessageCount ), и в коде было «пока (myQMessageCount > 0)», и он сбрасывал счетчик очереди после каждого msg.Complete ( внутри того же цикла while)

Оказывается, .MessageCount — это «сумма» всех сообщений в очереди, включая Active (те, которые вы должны иметь возможность читать), недоставленные письма и другие.

Итак (1), исправление заключалось в том, что он изменил свой код, чтобы проверить ActiveMessageCount, а не .MessageCount.

                Microsoft.ServiceBus.Messaging.QueueDescription qd = myMicrosoftdotServiceBusdotNamespaceManager.GetQueue(qName);
                string deadLetterQueueName = QueueClient.FormatDeadLetterPath(qd.Path);
                int activeMessageCount = qd.MessageCountDetails.ActiveMessageCount;
                int deadLetterMessageCount = qd.MessageCountDetails.DeadLetterMessageCount;
                int scheduledMessageCount = qd.MessageCountDetails.ScheduledMessageCount;
                int transferDeadLetterMessageCount = qd.MessageCountDetails.TransferDeadLetterMessageCount;
                int transferMessageCount = qd.MessageCountDetails.TransferMessageCount;

и (2) после того, как мы обсудили это, вероятно, неразумно продолжать проверять ActiveMessageCount и просто позволить возвращаемому null BrokeredMessage быть проверкой того, что в очереди больше нет сообщений.

Тем не мение. Я публикую этот ответ здесь для будущих читателей, которые могут застрять в каком-то пользовательском коде очереди чтения, который они пишут.

person granadaCoder    schedule 15.09.2017