BatchBlock создает пакет с элементами, отправленными после TriggerBatch().

У меня есть конвейер потока данных, состоящий из нескольких блоков. Когда элементы проходят через мой конвейер обработки, я хочу сгруппировать их по полю A. Для этого у меня есть BatchBlock с высоким BoundedCapacity. В нем я храню свои элементы, пока не решу, что их надо освободить. Поэтому я вызываю метод TriggerBatch().

private void Forward(TStronglyTyped data)
{
    if (ShouldCreateNewGroup(data))
    {
        GroupingBlock.TriggerBatch();
    }

 GroupingBlock.SendAsync(data).Wait(SendTimeout);
}

Вот как это выглядит. Проблема в том, что созданный пакет иногда содержит следующий размещенный элемент, которого там быть не должно.

Проиллюстрировать:

BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);

На данный момент я ожидаю, что моя партия будет {A,A,A}, но это {A,A,A,B}

Например, TriggerBatch() был асинхронным, а SendAsync на самом деле был выполнен до того, как пакет был фактически создан.

Как я могу это решить? Я явно не хочу туда ставить Task.Wait(x) (я пытался, и это работает, но тогда производительность плохая, конечно).


person wojciech_rak    schedule 09.03.2016    source источник
comment
Вы не объясняете, как вы вызываете Forward, но почти наверняка проблема в том, что между вызовами ShouldCreate и TriggerBatch ему было отправлено другое сообщение. В этом нет ничего плохого, просто так оно и должно работать. Вы не должны пытаться активировать BatchBlock извне. Единственный способ избежать таких проблем — запустить его изнутри. Создайте пользовательский блок с DataflowBlock.Encapsulate, который предоставляет ActionBlock в качестве входных данных и BatchBlock или BufferBlock в качестве выходных данных. В ActionBlock проверьте ввод и либо добавьте сообщение, либо запустите пакет   -  person Panagiotis Kanavos    schedule 09.03.2016
comment
Проверьте этот пример, который создает SlidingWindow блокировать, используя Encapsulate, используя ActionBlock, Queue для хранения и BufferBlock для вывода   -  person Panagiotis Kanavos    schedule 09.03.2016
comment
Forward вызывается из ActionBlock, предшествующего BatchBlock. Я отключил параллелизм, поэтому каждый блок должен обрабатывать только одно сообщение за раз, верно?   -  person wojciech_rak    schedule 09.03.2016
comment
Кто публикует сообщения в BatchBlock? Его нельзя связать с ActionBlock, так откуда же он берет данные? В любом случае вам не нужен BatchBlock, вы можете использовать простую очередь, список и т. д. и просто опубликовать массив всех кэшированных объектов, когда это необходимо. Это то, что делает пример SlidingWindow.   -  person Panagiotis Kanavos    schedule 09.03.2016
comment
Ты прав. Я немного изменил пример SlidingWindow. В части ActionBlock я проверяю, следует ли выталкивать текущие данные наружу. Теперь все работает так, как я хотел. Спасибо!   -  person wojciech_rak    schedule 09.03.2016


Ответы (2)


Я также столкнулся с этой проблемой, пытаясь вызвать TriggerBatch не в том месте. Как уже упоминалось, пример SlidingWindow с использованием DataflowBlock.Encapsulate является ответом здесь, но потребовалось некоторое время для адаптации, поэтому я решил поделиться своим завершенным блоком.

Мой ConditionalBatchBlock создает пакеты до максимального размера, возможно, раньше, если выполняется определенное условие. В моем конкретном сценарии мне нужно было создать пакеты из 100, но всегда создавать новые пакеты при обнаружении определенных изменений в данных.

public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
    var queue = new Queue<T>();

    var source = new BufferBlock<T[]>();

    var target = new ActionBlock<T>(async item =>
    {
        // start a new batch if required by the condition
        if (condition(queue, item))
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }

        queue.Enqueue(item);

        // always send a batch when the max size has been reached
        if (queue.Count == batchSize)
        {
            await source.SendAsync(queue.ToArray());
            queue.Clear();
        }
    });

    // send any remaining items
    target.Completion.ContinueWith(async t =>
    {
        if (queue.Any())
            await source.SendAsync(queue.ToArray());

        source.Complete();
    });

    return DataflowBlock.Encapsulate(target, source);
}

Параметр condition может быть проще в вашем случае. Мне нужно было посмотреть на очередь, а также на текущий элемент, чтобы решить, создавать ли новую партию.

Я использовал это так:

public async Task RunExampleAsync<T>()
{
    var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));

    var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));

    conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await ReadDataAsync<T>(conditionalBatchBlock);

    await actionBlock.Completion;
}
person Loren Paulsen    schedule 05.04.2016

Вот специализированная версия Лорен Полсен CreateConditionalBatchBlock метод. Этот принимает аргумент Func<TItem, TKey> keySelector и выдает новый пакет каждый раз, когда принимается элемент с другим ключом.

public static IPropagatorBlock<TItem, TItem[]> CreateConditionalBatchBlock<TItem, TKey>(
    Func<TItem, TKey> keySelector,
    DataflowBlockOptions dataflowBlockOptions = null,
    int maxBatchSize = DataflowBlockOptions.Unbounded,
    IEqualityComparer<TKey> keyComparer = null)
{
    if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
    if (maxBatchSize < 1 && maxBatchSize != DataflowBlockOptions.Unbounded)
        throw new ArgumentOutOfRangeException(nameof(maxBatchSize));

    keyComparer = keyComparer ?? EqualityComparer<TKey>.Default;
    var options = new ExecutionDataflowBlockOptions();
    if (dataflowBlockOptions != null)
    {
        options.BoundedCapacity = dataflowBlockOptions.BoundedCapacity;
        options.CancellationToken = dataflowBlockOptions.CancellationToken;
        options.MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask;
        options.TaskScheduler = dataflowBlockOptions.TaskScheduler;
    }

    var output = new BufferBlock<TItem[]>(options);

    var queue = new Queue<TItem>(); // Synchronization is not needed
    TKey previousKey = default;

    var input = new ActionBlock<TItem>(async item =>
    {
        var key = keySelector(item);
        if (queue.Count > 0 && !keyComparer.Equals(key, previousKey))
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
        queue.Enqueue(item);
        previousKey = key;

        if (queue.Count == maxBatchSize)
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
    }, options);

    _ = input.Completion.ContinueWith(async t =>
    {
        if (queue.Count > 0)
        {
            await output.SendAsync(queue.ToArray()).ConfigureAwait(false);
            queue.Clear();
        }
        if (t.IsFaulted)
        {
            ((IDataflowBlock)output).Fault(t.Exception.InnerException);
        }
        else
        {
            output.Complete();
        }
    }, TaskScheduler.Default);

    return DataflowBlock.Encapsulate(input, output);
}
person Theodor Zoulias    schedule 26.06.2020
comment
Более тяжелую пользовательскую реализацию BatchBlock можно найти здесь< /а>. - person Theodor Zoulias; 27.06.2020