Хороший подход для запуска задач синхронно с FIFO?

В настоящее время я делаю свои первые шаги с async/await и задачами в .NET, и я очень взволнован тем, как легко запускать вещи асинхронно! Однако на данный момент мне приходится общаться с устройствами через SerialPort. Поскольку одновременно возможно только одно соединение, я просто написал несколько методов расширения для запуска всех этих методов, поступающих из разных задач/потоков, синхронно и в порядке поступления:

public static class Extensions
{
    private readonly static object LockObject = new object();

    public static Task<TResult> RunAfter<TResult>(this Task<TResult> task, ConcurrentQueue<Task> others)
        => (Task<TResult>)task.RunAllSynchronously(others);

    public static Task RunAfter(this Task task, ConcurrentQueue<Task> others)
        => task.RunAllSynchronously(others);

    private static Task RunAllSynchronously(this Task task, ConcurrentQueue<Task> others)
    {
        if (others == null) throw new ArgumentNullException("The value of " + nameof(others) + " is null!");
        lock (LockObject)
        {
            others.Enqueue(task);
            Task currentTask;
            while (others.TryDequeue(out currentTask))
            {
                currentTask.RunSynchronously();
                if (currentTask == task) break;
            }
        }
        return task;
    }
}

Такой подход кажется хорошим или в таком случае следует рассматривать по-другому?


person Daniel    schedule 10.08.2015    source источник
comment
Вы запускаете все синхронно. Зачем вообще использовать задачи? Я бы посоветовал прочитать это о том, как реально использовать преимущества асинхронности с последовательным портом.   -  person Yuval Itzchakov    schedule 10.08.2015
comment
THX, я обязательно прочитаю эту статью! Странно, почему я его еще не нашел...   -  person Daniel    schedule 10.08.2015
comment
Даниил, прежде всего, вы должны ответить на свой вопрос, добавив ответ, а не как часть своего вопроса. Во-вторых, ваше решение похоже на окончательную форму моего. Вы должны либо отметить мой как правильный, либо просто добавить свой собственный ответ...   -  person Matías Fidemraizer    schedule 13.08.2015
comment
Получилось, спасибо за совет ;)   -  person Daniel    schedule 14.08.2015


Ответы (3)


Почему вы запускаете их синхронно?

Вы должны запускать задачи асинхронно и использовать async и await для выполнения их одну за другой:

 Task currentTask;
 while (others.TryDequeue(out currentTask))
 {
      await currentTask;
      if (currentTask == task) break;
 }

С другой стороны, глядя на ваш код, я вообще не могу найти причину для использования lock (синхронизация потоков). Вы синхронизируете потоки с некоторым общим ресурсом (то есть с некоторым объектом, который может или не может быть прочитан/изменен более чем потоком). Ваш метод может быть переработан для:

private static async Task RunAllAsync(this Task task, ConcurrentQueue<Task> others)
{
    // Design by contract rocks ;)
    // See:  https://msdn.microsoft.com/en-us/library/dd264808(v=vs.110).aspx
    Contracts.Requires(task != null);
    Contracts.Requires(others != null);

    others.Enqueue(task);

    // See how I've improved your loop. Since ConcurrentQueue.TryDequeue
    // will return false if other thread has called it already, your loop
    // should try to dequeue again until it returns true, and it should
    // break if dequeued task is the task against which the extension method
    // was called or the concurrent queue has no more items, to prevent a 
    // possible infinite loop
    do
    { 
       Task currentTask;
       if(others.TryDequeue(out currentTask))
          await currentTask;

    }
    while (currentTask == task || others.Count > 0);

    return task;
}

Обновлять

ОП сказал:

Возможно, я забыл сказать, что ConcurrentQueue — это ресурс, который должен быть разделен между потоками. т.е. Task.RunAllSynchronously() вызывается для каждой новой задачи (доступ к SerialPort), и этот вызов может исходить из другого потока. Кроме того, я не могу гарантировать, что RunAllSynchronously() вызывается только что, когда все текущие (или поставленные в очередь) задачи завершены (я мог, но поэтому мне пришлось использовать что-то вроде блокировки вне метода расширения, что на самом деле не так уж приятно иметь метод расширения.

Вот почему вы используете ConcurrentQueue<T>. Потокобезопасность управляется внутри. Если вы вызываете ConcurrentQueue<T>.TryDequeue и несколько потоков вызывают его одновременно, выиграет только один, а остальные получат false в качестве возвращаемого значения, а параметр out не будет назначен. Посмотрите, что говорит MSDN по этому поводу:

ConcurrentQueue обрабатывает всю синхронизацию внутри себя. Если два потока вызывают TryDequeue в один и тот же момент, ни одна операция не блокируется. Когда между двумя потоками обнаруживается конфликт, один поток должен попытаться снова получить следующий элемент, а синхронизация выполняется внутри.

TryDequeue пытается удалить элемент из очереди. Если метод выполнен успешно, элемент удаляется, и метод возвращает значение true; в противном случае возвращается ложь. Это происходит атомарно по отношению к другим операциям в очереди. Если очередь была заполнена таким кодом, как q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); и два потока одновременно пытаются удалить элемент из очереди, один поток удалит из очереди a, а другой поток удалит из очереди b. Оба вызова TryDequeue вернут true, потому что они оба смогли удалить элемент из очереди. Если каждый поток вернется к удалению из очереди дополнительного элемента, один из потоков удалит из очереди c и вернет true, тогда как другой поток обнаружит, что очередь пуста, и вернет false.

person Matías Fidemraizer    schedule 10.08.2015
comment
@YuvalItzchakov Я собирался отредактировать, чтобы указать, что я не нахожу причин использовать lock в коде OP .. - person Matías Fidemraizer; 10.08.2015
comment
@YuvalItzchakov Готово! - person Matías Fidemraizer; 10.08.2015
comment
Пробуя разные вещи с задачами и асинхронностью, я заметил, что вы не можете использовать ожидание внутри блока блокировки. В этом случае мне действительно нужно использовать блокировку, не так ли? - person Daniel; 10.08.2015
comment
@Daniel Нет необходимости в блокировке - вы работаете только с локальными и потокобезопасными объектами. Если вам не нужно гарантировать, что одновременно выполняется только одна задача, блокировка не нужна. И если вам это нужно, используйте вместо этого SemaphoreSlim. - person Luaan; 10.08.2015
comment
@ Луан, спасибо. Возможно, я забыл сказать, что ConcurrentQueue — это ресурс, который должен быть разделен между потоками. т.е. Task.RunAllSynchronously() вызывается для каждой новой задачи (доступ к SerialPort), и этот вызов может исходить из другого потока. Кроме того, я не могу гарантировать, что RunAllSynchronously() просто вызывается, когда все текущие (или поставленные в очередь) задачи завершены (я мог, но поэтому мне пришлось использовать что-то вроде блокировки вне метода расширения, что на самом деле не так уж приятно иметь метод расширения. - person Daniel; 10.08.2015
comment
@Luaan Я только что нашел способ использовать что-то вроде SemaphoreSlim уже (msdn.microsoft.com/en-us/library/ee789351%28v=vs.110%29.aspx), но для меня использование методов расширения выглядит немного красивее. - person Daniel; 10.08.2015
comment
@Daniel Весь смысл ConcurrentQueue заключается в обеспечении потокобезопасной очереди - она ​​явно предназначена для работы без блокировок. И будьте осторожны с RunSynchronously, это не обязательно встроенный запуск. Возможно, написание собственного TaskScheduler было бы лучшим решением? Не совсем очевидно, что вы пытаетесь сделать. - person Luaan; 10.08.2015
comment
@ Даниэль, я повторно обновил ответ. Проверьте переработанный метод расширения и то, как do while будет работать лучше в вашем сценарии;) - person Matías Fidemraizer; 10.08.2015
comment
Спасибо вам обоим! Изначально проблема заключалась в том, что я могу разрешить только один доступ к SerialPort одновременно. Поэтому я подумал, что смогу справиться с этим, используя методы расширения, чтобы запускать только одну задачу за другой, а не параллельно. Оглядываясь назад, реализация собственного TaskScheduler кажется хорошей идеей, так как я явно хочу ограничить доступ к одной единственной задаче (мне не важен порядок - просто первый пришел первый вышел). Я бы проголосовал за ваши посты, но не могу из-за меньшей репутации. - person Daniel; 10.08.2015
comment
@MatíasFidemraizer спасибо;) Использование do while действительно лучше =) - person Daniel; 10.08.2015

Прежде всего:

Вы получаете выгоду от async-await только в том случае, если вашей программе есть что делать во время выполнения ваших задач.

Если бы ваш основной поток запускал задачу и ничего не делал, кроме как ждал завершения этой задачи, ваш основной поток мог бы выполнить эту работу сам. Это было бы даже быстрее.

В вашем примере я могу представить, что отправка по последовательной линии значительно медленнее, чем ваша обработка. Поэтому я могу представить, что пока один поток занят отправкой данных по последовательной линии, ваш поток может быть занят созданием следующих данных, которые должны быть отправлены. Или, может быть, 10 потоков создают данные, которые должны быть отправлены один за другим. Конечно, в последнем случае не гарантируется порядок отправки данных.

Но давайте посмотрим на это проще: один поток создает данные на своей скорости, а другой поток независимо отправляет данные по последовательной линии.

Это говорит о модели производитель-потребитель: один поток является производителем, он производит элементы, которые считывает и обрабатывает потребитель. Через некоторое время производитель сообщает потребителю, что никаких данных больше не ожидается.

Ключевым объектом в этом является System.Threading.Tasks.Dataflow.BufferBlock. См. MSDN. В разделе примечаний говорится, что он распространяется через nuget.

В bufferBlock реализованы два интерфейса:

  • ITargetBlock<T>, чтобы производитель отправлял свои выходные данные
  • ISourceBlock<T>, чтобы потребитель мог читать входные данные.

Предположим, вы используете System.IO.Ports.SerialPort для отправки данных. Увы, у этого класса нет поддержки асинхронности, поэтому нам придется создать его самостоятельно. Предположим, вы хотите преобразовать объекты типа T в формат, который можно отправить по последовательной линии. Код будет выглядеть следующим образом:

private void Write(T t)
{
    var dataToSend = ConvertToData(t);
    serialPort.Write(dataToSend);
}

Это не очень асинхронно. Итак, давайте сделаем из него асинхронную функцию:

private async Task WriteAsync(T t)
{
    return await Task.Run ( () =>
    {
        var dataToSend = ConvertToData(t);
        serialPort.Write(dataToSend);
    }
}

Или вы можете просто вызвать другую функцию записи:

return await Task.Run ( () => Write(t));

Примечание: если вы убедитесь, что только один поток будет использовать эту функцию, вам не нужно его блокировать.

Теперь, когда у нас есть асинхронная функция для отправки объектов типа T по последовательной линии, давайте создадим производителя, который будет создавать объекты типа T и отправлять их в буферный блок.

Я сделаю это асинхронным, чтобы вызывающий поток мог заниматься другими делами, пока генерируются данные:

private BufferBlock<T> bufferBlock = new BufferBlock<T>();

private async Task ProduceAsync()
{
    while (objectsToProcessAvailable())
    {
        T nextObject = GetNextObjectToProcess()
        await bufferBlock.SendAsync(nextObject);
    }
    // nothing to process anymore: mark complete:
    bufferBlock.Complete();
}

Принимающая сторона будет выполняться другим потоком:

private Task ConsumeAsync()
{
    // as long as there is something to process: fetch it and process it
    while (await bufferBlock.OutputAvailableAsync())
    {
        T nextToProcess = await bufferBlock.ReceiveAsync();
        // use WriteAsync to send to the serial port:
        await WriteAsync(nextToProcess);
    }
    // if here: no more data to process. Return
}

Теперь все, что нам нужно, — это одна процедура, которая создает два потока и ждет завершения обеих задач:

private async Task ProduceConsumeAsync()
{
    var taskProducer = ProduceAsync();
    // while the producer is busy producing, you can start the consumer:
    var taskConsumer = ConsumeAsync();
    // while both tasks are busy, you can do other things,
    // like keep the UI responsive
    // after a while you need to be sure the tasks are finished:
    await Task.WhenAll(new Task[] {taskProducer, taskConsumer});
}

Примечание: из-за bufferBlock не проблема, что производитель уже производит, а потребитель еще не запущен.

Все, что нам нужно, это функция, которая запускает асинхронность, если у вас есть обработчик событий, просто объявите его асинхронным:

private async void OnButton1_clicked(object sender, ...)
{
    await ProduceConsumeAsync()
}

Если у вас нет асинхронной функции, вам нужно создать задачу самостоятельно:

private void MyFunction()
{
    // start produce consume:
    var myTask = Task.Run( () => ProduceConsumeAsync());
    // while the task is running, do other things.
    // when you need the task to finish:
    await myTask;
 }

Дополнительные сведения о шаблоне потребитель-производитель. См. MSDN

Как реализовать шаблон потока данных "производитель-потребитель"

person Harald Coppoolse    schedule 10.08.2015
comment
Большое спасибо! Вы совершенно правы, используя этот шаблон. Также спасибо за очень подробное описание этого сценария! Однако я использую NModbus для доступа к последовательному порту. Эта библиотека уже предлагает асинхронные методы для доступа к последовательному порту. Единственное, чем мне приходится управлять, это то, что одновременно предоставляется только один доступ. Тем не менее, этот шаблон был бы очень подходящим для самого NModbus для реализации семантики «первым пришел — первым обслужен». Для моего случая (поверх NModbus) этот шаблон не подходит, так как у меня нет «базового» метода, такого как метод «WriteAsync» в вашем примере. - person Daniel; 10.08.2015
comment
В вашем случае единственная разница будет заключаться в том, что вам не нужна функция Write (T), WriteAsync (T) будет вызывать вашу асинхронную функцию NModBus, или я вижу все слишком просто? - person Harald Coppoolse; 10.08.2015
comment
Ну, это не так просто, так как функции записи NModbus имеют разные параметры (не «только» объект сообщения или что-то подобное), но по этой причине я сожалею, что это было бы очень уместно внутри NModBus, на уровне, где «настоящая» запись (объект сообщение) функция есть. Другой вопрос, который может возникнуть, — это тип возвращаемого значения. В шаблоне «производитель-потребитель» нет простого подхода к работе с возвращаемыми типами, или я ошибаюсь? - person Daniel; 10.08.2015
comment
Вы хотите возврат в результате полной обработки или результат по обработанному элементу? Пусть потребитель возвращает запрошенный результат. Помните, что если функция вернет TResult, асинхронная версия вернет Task<TResult›. После await Task.WhenAll вы можете проверить свойство Result задачиConsumer, которое содержит возвращаемое значение TResult. Другой метод: пусть потребитель создает результаты в буферном блоке, а кто-то использует этот вывод. - person Harald Coppoolse; 10.08.2015
comment
Да, я имел в виду возвращаемый тип для обрабатываемого элемента. Позволить потребителю поместить свои результаты в другой BufferBlock может сработать, но тогда мне нужно каким-то образом «сопоставить» входные элементы (задачи в моем случае) с выходными результатами. Тем не менее, я мог бы найти простое решение для моей первоначальной проблемы, см. Обновление. - person Daniel; 10.08.2015

Поиграв с разными вещами, я просто нашел простое решение, которого мне должно хватить, и оно чем-то похоже на решение Матиас Фидемрайзер:

private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();

public async static Task RunAlone(this Task task)
{
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            await nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());
}

public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
    TResult result = default(TResult);
    Tasks.Enqueue(task);

    do
    {
        var nextTask = Tasks.First();

        if (nextTask == task)
        {
            nextTask.Start();
            result = await (Task<TResult>)nextTask;
            Task deletingTask;
            Tasks.TryDequeue(out deletingTask);
            break;
        }
        else
        {
            nextTask.Wait();
        }
    } while (Tasks.Any());

    return result;
}
person Daniel    schedule 14.08.2015