В Azure WebJobs в классе OnMessageOptions
я вызываю метод QueueClient.Complete(Guid)
, устанавливая для флага AutoComplete
значение true, и сообщения, кажется, отлично удаляются из очереди при запуске функции ProcessQueue. Счетчик активных сообщений уменьшается на 1 после успешной обработки каждого сообщения. Однако, когда я хочу повторно поставить сообщение (потому что оно не может быть обработано в настоящее время) обратно в очередь, которая запускает функцию служебной шины, в виде нового сообщения через посредника через минуту, используя BrokeredMessage.ScheduledEnqueueTimeUtc
, кажется, что оно не работает. Похоже, что изначально количество запланированных сообщений увеличивается. Я возвращаюсь в очередь через несколько часов и вижу тысячи активных сообщений. Копии одного и того же сообщения. Что происходит? Я ожидал, что сообщение будет удалено из очереди из-за QueueClient.Complete(Guid)
, и новое запланированное сообщение будет его заменой.
Немного подробностей:
Чтобы отправить сообщение, я делаю следующее:
var queueclient = QueueClient.CreateFromConnectionString(connectionString, queueName);
queueclient.Send(message);
queueclient.close();
Внутри WebJob я создал объект ServiceBusConfiguration
, для которого требуется объект onMessageOptions
, где я установил AutoComplete=true
. Я передаю объект ServiceBusConfiguration
методу JobHostConfiguration.UserServiceBus
.
Внутри функции, инициированной очередью служебной шины WebJob, я снова делаю следующее для повторной постановки в очередь, сначала снова создавая новый экземпляр сообщения через посредника.
// если еще не доступно для обработки, подайте заявку повторно ...
var queueclient = QueueClient.CreateFromConnectionString(connectionString, queueName);
queueclient.Send(message);
queueclient.close();
Я не делаю следующие / использую обратные вызовы, может быть, поэтому они не работают?
var options = new OnMessageOptions();
options.AutoComplete = false; // to call complete ourselves
Обратный вызов для обработки полученных сообщений
client.OnMessage(m =>
{
var clone = m.Clone();
clone.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddSeconds(60);
client.Send(clone);
m.Complete();
}, options);