У меня есть конвейер потока данных, состоящий из нескольких блоков. Когда элементы проходят через мой конвейер обработки, я хочу сгруппировать их по полю A
. Для этого у меня есть BatchBlock
с высоким BoundedCapacity
. В нем я храню свои элементы, пока не решу, что их надо освободить. Поэтому я вызываю метод TriggerBatch()
.
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
Вот как это выглядит. Проблема в том, что созданный пакет иногда содержит следующий размещенный элемент, которого там быть не должно.
Проиллюстрировать:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
На данный момент я ожидаю, что моя партия будет {A,A,A}
, но это {A,A,A,B}
Например, TriggerBatch()
был асинхронным, а SendAsync
на самом деле был выполнен до того, как пакет был фактически создан.
Как я могу это решить? Я явно не хочу туда ставить Task.Wait(x)
(я пытался, и это работает, но тогда производительность плохая, конечно).
Forward
, но почти наверняка проблема в том, что между вызовамиShouldCreate
иTriggerBatch
ему было отправлено другое сообщение. В этом нет ничего плохого, просто так оно и должно работать. Вы не должны пытаться активировать BatchBlock извне. Единственный способ избежать таких проблем — запустить его изнутри. Создайте пользовательский блок с DataflowBlock.Encapsulate, который предоставляет ActionBlock в качестве входных данных и BatchBlock или BufferBlock в качестве выходных данных. В ActionBlock проверьте ввод и либо добавьте сообщение, либо запустите пакет - person Panagiotis Kanavos   schedule 09.03.2016Forward
вызывается из ActionBlock, предшествующего BatchBlock. Я отключил параллелизм, поэтому каждый блок должен обрабатывать только одно сообщение за раз, верно? - person wojciech_rak   schedule 09.03.2016ActionBlock
я проверяю, следует ли выталкивать текущие данные наружу. Теперь все работает так, как я хотел. Спасибо! - person wojciech_rak   schedule 09.03.2016