MessageLockLostException: поставленная блокировка недействительна. Либо срок блокировки истек, либо сообщение уже удалено из очереди.

Я пытаюсь получить сообщение из очереди с помощью триггера очереди служебной шины и выполнить некоторую работу, выполнение которой займет некоторое время. Я не хочу, чтобы другой процессор выбирал сообщение, пока я обрабатываю сообщение. У меня есть следующая конфигурация в host.json. Когда я получаю сообщение из очереди на await Receiver.CompleteAsync (lockToken);, я получаю исключение. Предоставленная блокировка недействительна. Либо срок блокировки истек, либо сообщение уже удалено из очереди.

 "serviceBus": {
  "prefetchCount": 1,
  "autoRenewTimeout": "00:05:00",
  "messageHandlerOptions": {
    "autoComplete": false,
    "maxConcurrentCalls": 1,
    "maxAutoRenewDuration": "00:04:00"
  }
}

Код из функции Azure приведен ниже.

 public static void Run([ServiceBusTrigger("testqueue", Connection = "AzureServiceBus.ConnectionString")]Message message, MessageReceiver messageReceiver,ILogger log)
    {
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {messageReceiver.ClientId}");
        log.LogInformation($"Message={Encoding.UTF8.GetString(message.Body)}");
        string lockToken = message.SystemProperties.LockToken;
        log.LogInformation($"Processing Message:={Encoding.UTF8.GetString(message.Body)}");
        DoSomeJob(messageReceiver, lockToken,log);
    }
    public static async void DoSomeJob(MessageReceiver receiver,string lockToken, ILogger log)
    {
        try
        {

            await Task.Delay(360000);
            await receiver.CompleteAsync(lockToken);

        }
        catch (Exception ex)
        {

            log.LogInformation($"Error In Job={ex}");
        }
    
    }

person manas sahu    schedule 25.08.2020    source источник
comment
Вы не заявили о проблеме.   -  person Ian Kemp    schedule 25.08.2020
comment
Я предполагаю вопрос - почему потеряна блокировка, несмотря на то, что maxAutoRenewDuration установлено на 10 минут?   -  person Sean Feldman    schedule 25.08.2020


Ответы (1)


Когда вы настраиваете функцию Azure, запускаемую служебной шиной Azure с maxAutoRenewDuration равным 10 минутам, вы запрашиваете у триггера продлить блокировку до 10 минут. Это не гарантированная операция, поскольку она инициируется на стороне клиента, а максимальный период блокировки составляет 5 минут. Учитывая это, операция по расширению блокировки может завершиться неудачно, и блокировка будет снята, в результате чего другой экземпляр вашей функции будет обрабатывать ее одновременно, в то время как исходная обработка все еще выполняется.

Еще один аспект, на который следует обратить внимание, - это prefetchCount, который установлен на 100, и maxConcurrentCalls, который установлен на 32. Это означает, что вы получаете до 100 сообщений и обрабатываете до 32 из них. Я не знаю, работает ли фактический код функции дольше 50 секунд (в вашем примере), но предварительно загруженные блокировки сообщений не обновляются автоматически. Следовательно, если предварительно загруженные сообщения не обрабатываются в течение MaxLockDuration времени очереди (которое по умолчанию составляет менее 5 минут), некоторые из этих предварительно загруженных сообщений начнут обработку, дополнительное обновление и завершение после потери блокировки.

Я бы посоветовал:

  1. Убедитесь, что MaxLockDuration не должен быть слишком коротким для вашей предварительной выборки и параллелизма.
  2. Обновите prefetchCount, чтобы не допустить чрезмерной выборки.
  3. Если обработка одного сообщения может быть выполнена в течение 5 минут или меньше, лучше предпочитайте это, а не автоматическое продление.
person Sean Feldman    schedule 25.08.2020
comment
что, если обработка одного сообщения займет более 5 минут? - person manas sahu; 25.08.2020
comment
Либо вы полагаетесь на автоматическое продление, которое не гарантируется, либо разрабатываете свое решение иначе ???? - person Sean Feldman; 25.08.2020