Dataflow Task.WhenAll причины Задача была отменена Исключение

Я новичок в Dataflow и следую этому пошаговому руководству Как отменить блокировку потока данных.
Я сначала нажимаю кнопку "Добавить", а затем нажимаю "Отмена", но получаю исключение "Задание было отменено Исключение" после нажатия кнопки отмены. Я не могу найти способ устранить эту ошибку.
Буду признателен за любую помощь.
Обновление: код для демонстрации:

    public partial class Form1 : Form
{
    CancellationTokenSource cancellationTokenSource;
    TransformBlock<WorkItem, WorkItem> startWork;
    ActionBlock<WorkItem> completeWork;
    ActionBlock<ToolStripProgressBar> incProgress;
    ActionBlock<ToolStripProgressBar> decProgress;
    TaskScheduler uiTaskScheduler;
    public Form1()
    {
        InitializeComponent();
        uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
        Cancel.Enabled = false;
    }

    private void Add_Click(object sender, EventArgs e)
    {
        if (!Cancel.Enabled)
        {
            CreatePipeline();
            Cancel.Enabled = true;
        }

        for (int i = 0; i < 20; i++)
        {
            toolStripProgressBar1.Value++;
            startWork.Post(new WorkItem());
        }
    }

    private async void Cancel_Click(object sender, EventArgs e)
    {
        Add.Enabled = false;
        Cancel.Enabled = false;

        cancellationTokenSource.Cancel();

        try
        {
             await Task.WhenAll(
                completeWork.Completion,
                incProgress.Completion,
                decProgress.Completion);
        }
        catch (OperationCanceledException)
        {

            throw;
        }
        toolStripProgressBar4.Value += toolStripProgressBar1.Value;
        toolStripProgressBar4.Value += toolStripProgressBar2.Value;

        // Reset the progress bars that track the number of active work items.
        toolStripProgressBar1.Value = 0;
        toolStripProgressBar2.Value = 0;

        // Enable the Add Work Items button.      
        Add.Enabled = true;
    }
    private void CreatePipeline()
    {
        cancellationTokenSource = new CancellationTokenSource();

        startWork = new TransformBlock<WorkItem, WorkItem>(workItem =>
        {
            workItem.DoWork(250, cancellationTokenSource.Token);
            decProgress.Post(toolStripProgressBar1);
            incProgress.Post(toolStripProgressBar2);
            return workItem;
        },
        new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token
        });

        completeWork = new ActionBlock<WorkItem>(workItem =>
        {
            workItem.DoWork(1000, cancellationTokenSource.Token);
            decProgress.Post(toolStripProgressBar2);
            incProgress.Post(toolStripProgressBar3);
        },
        new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token,
            MaxDegreeOfParallelism = 2
        });

        startWork.LinkTo(completeWork);

        startWork.Completion.ContinueWith(delegate { completeWork.Complete(); },cancellationTokenSource.Token);
        incProgress = new ActionBlock<ToolStripProgressBar>(progress =>
        {
            progress.Value++;
        },
        new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationTokenSource.Token,
            TaskScheduler = uiTaskScheduler
        });

        decProgress = new ActionBlock<ToolStripProgressBar>(progress => progress.Value--,
            new ExecutionDataflowBlockOptions
            {
                CancellationToken = cancellationTokenSource.Token,
                TaskScheduler = uiTaskScheduler
            });

    }

    class WorkItem
    {
        public void DoWork(int milliseconds, CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested == false)
            {
                Thread.Sleep(milliseconds);
            }
        }
    }
}

person Edward    schedule 29.11.2017    source источник
comment
Лучше публиковать соответствующий код в самом вопросе, а не публиковать ссылку на огромный учебник.   -  person Evk    schedule 29.11.2017
comment
Что ж, если вы отмените задачу, вы не должны удивляться, получив исключение TaskCanceledException. Это просто отражает то, что вы сделали   -  person Sir Rufo    schedule 29.11.2017
comment
@SirRufo Как избежать этого исключения вместо обработки этого исключения?   -  person Edward    schedule 29.11.2017
comment
Ну не перекинуть пойманное исключение? - Кстати, у вас есть обработка исключений для этого; o)   -  person Sir Rufo    schedule 29.11.2017
comment
Не имеет отношения к вашей проблеме, но startWork.Completion.ContinueWith не нужен. Вам просто нужно распространить завершение. Кроме того, incProgress и decProgress ActionBlock не приносят вам много пользы, они могут быть просто Progress‹ Т›   -  person JSteward    schedule 29.11.2017
comment
@JSteward Спасибо, я новичок в Dataflow, не могли бы вы поделиться с нами кодом, связанным с Propagate и Progree‹T›?   -  person Edward    schedule 30.11.2017
comment
любая причина для голосования?   -  person Edward    schedule 30.11.2017


Ответы (2)


Как указал @SirRufo, решение вашего вопроса заключается в том, чтобы просто не выдавать исключение повторно после того, как вы его поймали. Но чтобы выделить некоторые другие методы, которые вы можете использовать с потоком данных, как обсуждалось в комментариях, я собрал небольшой образец. Я пытался сохранить дух и цель вашего исходного кода нетронутыми. С этой целью; исходный код не показывал нормальное завершение потока, а не отмену, поэтому я также оставил его здесь.

using System;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace WindowsFormsApp1 {
    public partial class Form1 : Form {

        private CancellationTokenSource cancellationTokenSource;
        private TransformBlock<WorkItem, WorkItem> startWork;
        private ActionBlock<WorkItem> completeWork;
        private IProgress<int> progressBar1Value;
        private IProgress<int> progressBar2Value;

        public Form1() {
            InitializeComponent();
            btnCancel.Enabled = false;
        }

        private async void btnAdd_Click(object sender, EventArgs e) {
            if(!btnCancel.Enabled) {
                CreatePipeline();
                btnCancel.Enabled = true;
            }
            var data = Enumerable.Range(0, 20).Select(_ => new WorkItem());
            foreach(var workItem in data) {
                await startWork.SendAsync(workItem);
                progressBar1.Value++;                
            }
        }

        private async void btnCancel_Click(object sender, EventArgs e) {
            btnAdd.Enabled = false;
            btnCancel.Enabled = false;

            cancellationTokenSource.Cancel();

            await completeWork.Completion.ContinueWith(tsk => this.Invoke(new Action(() => this.Text = "Flow Cancelled")), 
                                                       TaskContinuationOptions.OnlyOnCanceled);

            progressBar4.Value += progressBar1.Value;
            progressBar4.Value += progressBar2.Value;

            // Reset the progress bars that track the number of active work items.
            progressBar1.Value = 0;
            progressBar2.Value = 0;

            // Enable the Add Work Items button.      
            btnAdd.Enabled = true;
        }

        private void CreatePipeline() {
            cancellationTokenSource = new CancellationTokenSource();
            progressBar1Value = new Progress<int>(_ => progressBar1.Value++);
            progressBar2Value = new Progress<int>(_ => progressBar2.Value++);

            startWork = new TransformBlock<WorkItem, WorkItem>(async workItem => {
                await workItem.DoWork(250, cancellationTokenSource.Token);
                progressBar1Value.Report(0); //Value is ignored since the progressbar value is simply incremented
                progressBar2Value.Report(0); //Value is ignored since the progressbar value is simply incremented
                return workItem;
            },
            new ExecutionDataflowBlockOptions {
                CancellationToken = cancellationTokenSource.Token
            });

            completeWork = new ActionBlock<WorkItem>(async workItem => {
                await workItem.DoWork(1000, cancellationTokenSource.Token);
                progressBar1Value.Report(0); //Value is ignored since the progressbar value is simply incremented
                progressBar2Value.Report(0); //Value is ignored since the progressbar value is simply incremented
            },
            new ExecutionDataflowBlockOptions {
                CancellationToken = cancellationTokenSource.Token,
                MaxDegreeOfParallelism = 2
            });

            startWork.LinkTo(completeWork, new DataflowLinkOptions() { PropagateCompletion = true });
        }
    }

    public class WorkItem {
        public async Task DoWork(int milliseconds, CancellationToken cancellationToken) {
            if(cancellationToken.IsCancellationRequested == false) {
                await Task.Delay(milliseconds);
            }
        }
    }
}
person JSteward    schedule 30.11.2017

После проверки кода я отпустил, что задачи будут отменены, если я нажму Отмена.

await Task.WhenAll(
                completeWork.Completion,
                incProgress.Completion,
                decProgress.Completion);   

Но в приведенном выше коде Task.WhenAll необходимо, чтобы все задачи возвращали статус завершения, тогда «Задача была отменена, исключение», как и ожидалось, если она возвращалась отмененной, а не завершенной.
Для возможного способа чтобы решить эту проблему, мы должны вернуть задачу, если мы отменили задачу, и приведенный ниже код работает для меня.

       await Task.WhenAll(
           completeWork.Completion.ContinueWith(task => cancelWork(task, "completeWork"), TaskContinuationOptions.OnlyOnCanceled),
           incProgress.Completion.ContinueWith(task => cancelWork(task, "incProgress"), TaskContinuationOptions.OnlyOnCanceled),
           decProgress.Completion.ContinueWith(task => cancelWork(task, "decProgress"), TaskContinuationOptions.OnlyOnCanceled));

Это разумно?

person Edward    schedule 30.11.2017