Добър подход за изпълняване на задачи синхронно с FIFO?

В момента правя първите си стъпки с async/await и задачи в .NET и съм много развълнуван колко лесно е да изпълнявам нещата асинхронно! В момента обаче трябва да комуникирам с устройства чрез SerialPort. Тъй като е възможна само една връзка по едно и също време, току-що написах няколко метода за разширение, за да стартирам всички тези методи, идващи от различни задачи/нишки, синхронно и в първи ред:

public static class Extensions
{
    private readonly static object LockObject = new object();

    public static Task<TResult> RunAfter<TResult>(this Task<TResult> task, ConcurrentQueue<Task> others)
        => (Task<TResult>)task.RunAllSynchronously(others);

    public static Task RunAfter(this Task task, ConcurrentQueue<Task> others)
        => task.RunAllSynchronously(others);

    private static Task RunAllSynchronously(this Task task, ConcurrentQueue<Task> others)
    {
        if (others == null) throw new ArgumentNullException("The value of " + nameof(others) + " is null!");
        lock (LockObject)
        {
            others.Enqueue(task);
            Task currentTask;
            while (others.TryDequeue(out currentTask))
            {
                currentTask.RunSynchronously();
                if (currentTask == task) break;
            }
        }
        return task;
    }
}

Този подход изглежда ли добър начин или такъв случай трябва да се третира по различен начин?


person Daniel    schedule 10.08.2015    source източник
comment
Изпълнявате всичко синхронно. Защо изобщо да използвате задачи? Бих предложил да прочетете това за това как наистина да се възползвате от асинхронността със сериен порт.   -  person Yuval Itzchakov    schedule 10.08.2015
comment
THX, определено ще прочета тази статия! Чудя се защо още не съм го намерил...   -  person Daniel    schedule 10.08.2015
comment
Даниел, първо, трябва да отговорите на собствения си въпрос, като добавите отговор, а не като част от въпроса си. Второ, вашето решение е подобно на крайната форма на моята. Трябва или да маркирате моя като правилен, или просто да добавите свой собствен отговор...   -  person Matías Fidemraizer    schedule 13.08.2015
comment
Успях, благодаря за съвета ;)   -  person Daniel    schedule 14.08.2015


Отговори (3)


Защо ги изпълнявате синхронно?

Трябва да изпълнявате задачи асинхронно и да използвате async и await, за да ги изпълнявате една по една:

 Task currentTask;
 while (others.TryDequeue(out currentTask))
 {
      await currentTask;
      if (currentTask == task) break;
 }

От друга страна, гледайки кода ви, не мога да намеря причина изобщо да използвам lock (синхронизиране на нишки). Синхронизирате нишки срещу някакъв споделен ресурс (т.е. някакъв обект, който може или не може да се очаква да бъде прочетен/променен от повече от нишка). Вашият метод може да бъде преработен до:

private static async Task RunAllAsync(this Task task, ConcurrentQueue<Task> others)
{
    // Design by contract rocks ;)
    // See:  https://msdn.microsoft.com/en-us/library/dd264808(v=vs.110).aspx
    Contracts.Requires(task != null);
    Contracts.Requires(others != null);

    others.Enqueue(task);

    // See how I've improved your loop. Since ConcurrentQueue.TryDequeue
    // will return false if other thread has called it already, your loop
    // should try to dequeue again until it returns true, and it should
    // break if dequeued task is the task against which the extension method
    // was called or the concurrent queue has no more items, to prevent a 
    // possible infinite loop
    do
    { 
       Task currentTask;
       if(others.TryDequeue(out currentTask))
          await currentTask;

    }
    while (currentTask == task || others.Count > 0);

    return task;
}

Актуализация

OP каза:

Вероятно съм забравил да кажа, че ConcurrentQueue е ресурсът, който трябва да се споделя между нишките. т.е. Task.RunAllSynchronously() се извиква при всяка нова задача (достъп до SerialPort) и това извикване може да идва от различна нишка. Освен това не мога да гарантирам, че RunAllSynchronously() просто се извиква, когато всички текущо изпълнявани (или поставени на опашка) задачи са завършени (можех, но затова трябваше да използвам нещо като заключване извън метода на разширение, което всъщност не е толкова хубаво да имаш метод на разширение.

Ето защо използвате ConcurrentQueue<T>. Безопасността на нишките се управлява вътрешно. Ако извикате ConcurrentQueue<T>.TryDequeue и повече от една нишка го извикат наведнъж, само един ще спечели, а други ще получат false като върната стойност и параметърът out няма да бъде присвоен. Вижте какво казва MSDN за това:

ConcurrentQueue обработва цялата синхронизация вътрешно. Ако две нишки извикат TryDequeue точно в един и същи момент, нито една операция не се блокира. Когато бъде открит конфликт между две нишки, едната нишка трябва да опита отново, за да извлече следващия елемент, а синхронизирането се обработва вътрешно.

TryDequeue се опитва да премахне елемент от опашката. Ако методът е успешен, елементът се премахва и методът връща true; в противен случай връща false. Това се случва атомарно по отношение на други операции в опашката. Ако опашката е била попълнена с код като q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); и две нишки едновременно се опитват да извадят елемент от опашката, едната нишка ще извади от опашката a, а другата нишка ще извади от опашката b. И двете извиквания към TryDequeue ще върнат true, защото и двете са успели да извадят елемент от опашката. Ако всяка нишка се върне, за да извади от опашката допълнителен елемент, една от нишките ще извади от опашката c и ще върне true, докато другата нишка ще намери опашката празна и ще върне false.

person Matías Fidemraizer    schedule 10.08.2015
comment
@YuvalItzchakov Щях да редактирам, за да отбележа, че не намирам причина да използвам lock в кода на OP.. - person Matías Fidemraizer; 10.08.2015
comment
@YuvalItzchakov Готово! - person Matías Fidemraizer; 10.08.2015
comment
Докато опитвах различни неща със задачи и async, забелязах, че не можете да използвате await в блок за заключване. В този случай наистина трябва да използвам заключване, нали? - person Daniel; 10.08.2015
comment
@Daniel Няма нужда от заключване - работите само с локални и безопасни за нишки обекти. Освен ако не трябва да гарантирате, че само една задача се изпълнява в даден момент, заключването е ненужно. И ако имате нужда от това, използвайте SemaphoreSlim вместо това. - person Luaan; 10.08.2015
comment
@Luaan благодаря. Вероятно съм забравил да кажа, че ConcurrentQueue е ресурсът, който трябва да се споделя между нишките. т.е. Task.RunAllSynchronously() се извиква при всяка нова задача (достъп до SerialPort) и това извикване може да идва от различна нишка. Освен това не мога да гарантирам, че RunAllSynchronously() просто се извиква, когато всички текущо изпълнявани (или поставени на опашка) задачи са завършени (можех, но затова трябваше да използвам нещо като заключване извън метода на разширение, което всъщност не е толкова хубаво да имаш метод на разширение. - person Daniel; 10.08.2015
comment
@Luaan Току-що намерих начин да използвам вече нещо като SemaphoreSlim (msdn.microsoft.com/en-us/library/ee789351%28v=vs.110%29.aspx), но за мен използването на методи за разширение изглежда малко по-красиво. - person Daniel; 10.08.2015
comment
@Daniel Целият смисъл на ConcurrentQueue е да осигури опашка, безопасна за нишки - тя е изрично проектирана да работи без заключвания. И внимавайте с RunSynchronously, не е задължително да се изпълнява вградено. Може би писането на собствен TaskScheduler би било по-добро решение за това? Не е съвсем очевидно какво се опитвате да направите. - person Luaan; 10.08.2015
comment
@Daniel Актуализирах отново отговора. Проверете преработения метод за разширение и как do while ще работи по-добре във вашия сценарий;) - person Matías Fidemraizer; 10.08.2015
comment
Благодаря и на двама ви! Първоначалният проблем беше, че мога да разреша само един достъп до SerialPort наведнъж. Затова реших, че мога да се справя с това, като използвам методи за разширение, за да изпълнявам само една задача след друга, а не паралелно. Поглеждайки назад, внедряването на собствен TaskScheduler изглежда е добра идея, тъй като изрично искам да огранича достъпа до една единствена задача (не ме интересува ред - само първи влязъл, първи излязъл). Бих гласувал за публикациите ви, но не мога поради по-слаба репутация. - person Daniel; 10.08.2015
comment
@MatíasFidemraizer благодаря ;) Използването на do while е наистина по-добър начин =) - person Daniel; 10.08.2015

Преди всичко:

Вие се възползвате само от async-await, ако програмата ви трябва да прави нещо друго, докато задачите ви се изпълняват.

Ако вашата главна нишка стартира задача и не прави нищо, освен да изчака тази задача да приключи, вашата главна нишка може да свърши работата сама. Това дори би било по-бързо.

Във вашия пример мога да си представя, че изпращането по серийната линия е значително по-бавно от вашата обработка. Така че мога да си представя, че докато една нишка е заета да изпраща данни по серийната линия, вашата нишка може да е заета със създаването на следващите данни, които трябва да бъдат изпратени. Или може би 10 нишки създават данни, които трябва да бъдат изпратени една след друга. Разбира се, в последния случай не е гарантирано в какъв ред ще бъдат изпратени данните.

Но нека го видим по-просто: една нишка създава данни със собствена скорост, докато друга нишка изпраща данни независимо по серийната линия.

Това крещи за модел производител - потребител: една нишка е производителят, тя произвежда елементи, които потребителят чете и обработва. След известно време производителят казва на потребителя, че вече не трябва да се очакват данни.

Ключовият обект в това е System.Threading.Tasks.Dataflow.BufferBlock. Вижте MSDN. Разделът за забележки казва, че се разпространява чрез nuget.

BufferBlock реализира два интерфейса:

  • ITargetBlock<T>, до който производителят да изпрати своя резултат
  • ISourceBlock<T> за потребителя, от който да прочете входа.

Да приемем, че използвате System.IO.Ports.SerialPort, за да изпратите вашите данни. Уви, този клас няма асинхронна поддръжка, така че трябва да го създадем сами. Да приемем, че искате да конвертирате обекти от тип T във формат, който може да бъде изпратен по серийната линия. Кодът би изглеждал така:

private void Write(T t)
{
    var dataToSend = ConvertToData(t);
    serialPort.Write(dataToSend);
}

Не е много асинхронно. Така че нека направим асинхронна функция от него:

private async Task WriteAsync(T t)
{
    return await Task.Run ( () =>
    {
        var dataToSend = ConvertToData(t);
        serialPort.Write(dataToSend);
    }
}

Или можете просто да извикате другата функция за запис:

return await Task.Run ( () => Write(t));

Забележка: ако се уверите, че има само една нишка, която ще използва тази функция, не е нужно да я заключвате.

Сега, след като имаме асинхронна функция за изпращане на обекти от тип T по серийната линия, нека създадем продуцент, който ще създава обекти от тип T и ще ги изпраща към буферния блок.

Ще го направя асинхронно, така че извикващата нишка да може да прави други неща, докато се произвеждат данни:

private BufferBlock<T> bufferBlock = new BufferBlock<T>();

private async Task ProduceAsync()
{
    while (objectsToProcessAvailable())
    {
        T nextObject = GetNextObjectToProcess()
        await bufferBlock.SendAsync(nextObject);
    }
    // nothing to process anymore: mark complete:
    bufferBlock.Complete();
}

Приемащата страна ще бъде направена от различна нишка:

private Task ConsumeAsync()
{
    // as long as there is something to process: fetch it and process it
    while (await bufferBlock.OutputAvailableAsync())
    {
        T nextToProcess = await bufferBlock.ReceiveAsync();
        // use WriteAsync to send to the serial port:
        await WriteAsync(nextToProcess);
    }
    // if here: no more data to process. Return
}

Сега всичко, от което се нуждаем, е една процедура, която създава двете нишки и изчаква, докато двете задачи приключат:

private async Task ProduceConsumeAsync()
{
    var taskProducer = ProduceAsync();
    // while the producer is busy producing, you can start the consumer:
    var taskConsumer = ConsumeAsync();
    // while both tasks are busy, you can do other things,
    // like keep the UI responsive
    // after a while you need to be sure the tasks are finished:
    await Task.WhenAll(new Task[] {taskProducer, taskConsumer});
}

Забележка: поради bufferBlock не е проблем, че производителят вече произвежда, докато потребителят все още не е стартиран.

Всичко, от което се нуждаем, е функция, която стартира async, ако имате манипулатор на събития, просто го декларирайте async:

private async void OnButton1_clicked(object sender, ...)
{
    await ProduceConsumeAsync()
}

Ако нямате асинхронна функция, трябва сами да създадете задача:

private void MyFunction()
{
    // start produce consume:
    var myTask = Task.Run( () => ProduceConsumeAsync());
    // while the task is running, do other things.
    // when you need the task to finish:
    await myTask;
 }

Повече информация за модела потребител - производител. Вижте MSDN

Как да: Приложите модел на поток от данни производител-потребител

person Harald Coppoolse    schedule 10.08.2015
comment
Благодаря ти много! Напълно си прав с използването на този модел. Също така ви благодаря за вашето много подробно описание на този сценарий! Въпреки това използвам NModbus за достъп до серийния порт. Тази библиотека вече предлага асинхронни методи за достъп до серийния порт. Единственото нещо, което трябва да управлявам, е да се предоставя само един достъп наведнъж. Въпреки това, този модел би бил много подходящ за самия NModbus за прилагане на семантиката "първи в първи излязъл". За моя случай (над NModbus) този модел не би бил подходящ, тъй като нямам „базов“ метод като метода „WriteAsync“ във вашия пример. - person Daniel; 10.08.2015
comment
Във вашия случай единствената разлика ще бъде, че нямате нужда от функцията Write(T), WriteAsync(T) ще извика вашата асинхронна функция NModBus или виждам нещата твърде прости? - person Harald Coppoolse; 10.08.2015
comment
Е, не е толкова просто, тъй като функциите за запис на NModbus имат различни параметри (не „само“ обект на съобщение или нещо подобно), но това е причината, поради която съжалявам, че би било много подходящо вътре в NModBus, на слоя, където „истинският“ Write (обект съобщение) функция е. Друг въпрос, който може да възникне, е тип връщане. С модела производител-потребител няма лесен подход за справяне с типове връщане или греша? - person Daniel; 10.08.2015
comment
Искате ли връщане в резултат на пълната обработка или резултат на обработен артикул? Нека потребителят върне искания резултат. Не забравяйте, че ако дадена функция върне TResult, асинхронната версия ще върне Task<TResult›. След await Task.WhenAll можете да проверите свойството Result на taskConsumer, което съдържа върнатата стойност TResult. Друг метод: оставете потребителя да произведе резултатите в буферен блок и оставете някой да консумира този изход - person Harald Coppoolse; 10.08.2015
comment
Да, имах предвид тип връщане на обработен артикул. Оставянето на потребителя да постави своите резултати в друг BufferBlock може да работи, но тогава трябва да „картографирам“ по някакъв начин входните елементи (в моя случай задачи) с изходните резултати. Въпреки това може да намеря просто решение за първоначалния си проблем, вижте Актуализация. - person Daniel; 10.08.2015

След като си играх с различни неща, току-що намерих просто решение, което би трябвало да е достатъчно за мен и е донякъде подобно на решението на Матиас Фидемрайзер:

private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();

public async static Task RunAlone(this Task task)
{
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            await nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());
}

public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
    TResult result = default(TResult);
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            result = await (Task<TResult>)nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());

    return result;
}
person Daniel    schedule 14.08.2015