Когда TransformBlock
имеет MaxDegreeOfParallelism > 1
и BoundedCapacity
, которые не являются неограниченными, почему он откладывает получение дальнейших сообщений, в то время как есть одна длительная задача, несмотря на то, что во входной очереди есть емкость?
Возьмите следующее консольное приложение. Он создает TransformBlock с MaxDegreeOfParallelism = 5
и BoundedCapacity = 5
, а затем передает ему 100 сообщений. Когда блок обрабатывает сообщение x == 50
, он задерживает эту задачу на 10 секунд.
TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
if (x == 50)
{
Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
await Task.Delay(10000);
}
Console.WriteLine($"processed message {x}");
return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });
DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block
for (int i = 0; i < 100; i++)
{
Stopwatch blockedTime = Stopwatch.StartNew();
await DoSomething.SendAsync(i).ConfigureAwait(false);
blockedTime.Stop();
Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}
DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();
Результаты показывают, что все сообщения 50-54 были получены блоком. Сообщения 51-54 завершены, затем в окне консоли не отображается никаких выходных данных в течение 10 секунд, прежде чем будет показано, что сообщение 50 завершено и сообщение 55 может быть получено блоком.
...
Submitted 50 Blocked for 0ms.
Submitted 51 Blocked for 0ms.
processed message 51
Submitted 52 Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53 Blocked for 0ms.
Submitted 54 Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50
processed message 55
Submitted 55 Blocked for 9998ms.
...
Почему блок преобразования не продолжает заполнять блок до ограниченной емкости, равной 5, и не использует остальные 4 степени параллелизма для продолжения обработки сообщений?
ActionBlock
не проявляет этих симптомов и продолжает обрабатывать сообщения на других доступных параллельных линиях.
Неограниченная емкость TransformBlock
также не проявляет эти симптомы.