Блок преобразования с параллелизмом и ограниченной емкостью, откладывающей поведение сообщения

Когда TransformBlock имеет MaxDegreeOfParallelism > 1 и BoundedCapacity, которые не являются неограниченными, почему он откладывает получение дальнейших сообщений, в то время как есть одна длительная задача, несмотря на то, что во входной очереди есть емкость?

Возьмите следующее консольное приложение. Он создает TransformBlock с MaxDegreeOfParallelism = 5 и BoundedCapacity = 5, а затем передает ему 100 сообщений. Когда блок обрабатывает сообщение x == 50, он задерживает эту задачу на 10 секунд.

TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
    if (x == 50)
    {
        Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
        await Task.Delay(10000);
    }
    Console.WriteLine($"processed message {x}");
    return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });

DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block

for (int i = 0; i < 100; i++)
{
    Stopwatch blockedTime = Stopwatch.StartNew();
    await DoSomething.SendAsync(i).ConfigureAwait(false);
    blockedTime.Stop();
    Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}

DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();

Результаты показывают, что все сообщения 50-54 были получены блоком. Сообщения 51-54 завершены, затем в окне консоли не отображается никаких выходных данных в течение 10 секунд, прежде чем будет показано, что сообщение 50 завершено и сообщение 55 может быть получено блоком.

...
Submitted 50    Blocked for 0ms.
Submitted 51    Blocked for 0ms.
processed message 51
Submitted 52    Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53    Blocked for 0ms.
Submitted 54    Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50 
processed message 55
Submitted 55    Blocked for 9998ms.
...

Почему блок преобразования не продолжает заполнять блок до ограниченной емкости, равной 5, и не использует остальные 4 степени параллелизма для продолжения обработки сообщений?

ActionBlock не проявляет этих симптомов и продолжает обрабатывать сообщения на других доступных параллельных линиях.

Неограниченная емкость TransformBlock также не проявляет эти симптомы.


person SeanOB    schedule 01.07.2020    source источник


Ответы (1)


Поскольку по умолчанию параметр EnsureOrdered равен true, он пытается сохранить порядок результатов. Другими словами, он не может продолжить обработку после BoundedCapacity, потому что ему необходимо поддерживать порядок, который представляет собой противодействие, которое вы видите в своих тестах.

Кроме того, ActionBlock не демонстрирует такого поведения, поскольку он не выводит данные ни в какой другой блок (это, так сказать, тупик), и поэтому отсутствует концепция упорядочения, противодействие ограничивается только ограниченной пропускной способностью и степенью параллелизма.

DataflowBlockOptions. Убедитесь, что свойство Ordered

По умолчанию блоки потока данных обеспечивают упорядоченность обработки сообщений. Установка EnsureOrdered в false сообщает блоку, что он может ослабить этот порядок, если он в состоянии это сделать. Это может быть полезно, если немедленная доступность обработанного результата важнее, чем поддержание порядка ввода-вывода.

Исправление состоит в том, чтобы удалить упорядоченное требование

new ExecutionDataflowBlockOptions 
       { 
           BoundedCapacity = 5,
           MaxDegreeOfParallelism = 5 ,
           EnsureOrdered = false
       });
person TheGeneral    schedule 01.07.2020
comment
Отличное объяснение! Легко забыть о влиянии поведения EnsureOrdered = true по умолчанию на то, как данные передаются в конвейере. - person Theodor Zoulias; 01.07.2020
comment
@TheodorZoulias да, это одна из тех вещей, которые я научился отключать в большинстве своих реализаций. Я хочу сырой силы! - person TheGeneral; 01.07.2020