Более простое решение, чем поток данных TPL для параллельного удаления асинхронных BLOB-объектов.

Я реализую рабочую роль в Azure, которой необходимо удалять большие двоичные объекты из хранилища Azure. Предположим, что мой список больших двоичных объектов содержит около 10 000 элементов.

Вероятно, самым простым синхронным подходом будет:

Parallel.ForEach(list, x => ((CloudBlob) x).Delete());

Требования:

  • Я хочу реализовать то же самое асинхронно (в одном потоке).

  • Я хочу ограничить количество одновременных подключений до 50, поэтому я буду выполнять удаление 10 000, когда одновременно выполняется только 50 асинхронных подключений. Если одно удаление завершено, можно начать новое.

Решение?

Пока что, прочитав этот вопрос и этого, кажется, что TPL Dataflow — это то, что нужно.

Это такая простая проблема, и поток данных кажется излишним. Есть ли более простая альтернатива?

Если нет, то как это будет реализовано с использованием потока данных? Насколько я понимаю, мне нужен один блок действий, который выполняет удаление async (нужно ли мне await?). При создании моего блока я должен установить MaxDegreeOfParallelism на 50. Затем мне нужно опубликовать свои 10 КБ BLOB-объектов из списка в блок, а затем выполнить с block.Completion.Wait(). Это правильно?


person talkol    schedule 08.09.2013    source источник


Ответы (2)


Для чего-то такого простого SemaphoreSlim должно быть достаточно. TPL Dataflow великолепен, особенно если вы хотите ограничить работу в одной части более крупного конвейера. Однако в вашем сценарии это больше похоже на то, что у вас действительно есть только одно действие, которое вам нужно регулировать.

Сделать это асинхронно довольно просто:

var semaphore = new SemaphoreSlim(50);
var tasks = list.Cast<CloudBlob>().Select(async x =>
{
    using (await semaphore.TakeAsync())
        await x.DeleteAsync();
});
await Task.WhenAll(tasks);

где TakeAsync определяется как:

private sealed class SemaphoreSlimKey : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    public SemaphoreSlimKey(SemaphoreSlim semaphore) { _semaphore = semaphore; }
    void IDisposable.Dispose() { _semaphore.Release(); }
}

public static async Task<IDisposable> TakeAsync(this SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync().ConfigureAwait(false);
    return new SemaphoreSlimKey(semaphore);
}
person Stephen Cleary    schedule 08.09.2013
comment
Поскольку я работаю в рабочей роли, у меня нет асинхронного контекста. Нужна ли мне ваша библиотека Nito.AsyncEx, описанная здесь stackoverflow.com/questions/9208921/ или я могу просто сделать Task.WaitAll(tasks) вместо await Task.WhenAll(tasks), так как я хочу заблокировать свой единственный рабочий поток, пока не будут завершены все удаления 10 КБ - person talkol; 09.09.2013
comment
Для будущих поколений Task.WhenAll(tasks).Wait(); является точным синтаксисом, поскольку WaitAll принимает только задачу [] - person talkol; 09.09.2013

Вы можете рассмотреть возможность использования планировщика заданий, ограничивающего параллелизм: http://msdn.microsoft.com/en-us/library/ee789351.aspx. Неограниченный параллелизм может привести к регулированию, поскольку вы можете мгновенно запустить DOS-сервер со слишком большим количеством одновременных запросов, и поэтому рекомендуется ограничивать это на прикладном уровне.

Обратите внимание, что клиент хранилища 2.1 поддерживает задачу с упреждающей отменой и т. д., чтобы упростить эту задачу. Код, который вы написали выше, потребует 50 потоков, поскольку он вызывает синхронный метод. Вы можете использовать новые методы DeleteAsync, чтобы сделать это полностью асинхронно в одном потоке. Если бы вы использовали асинхронное ожидание, поддержка 50 одновременных запросов была бы довольно простой, поскольку вы могли бы выполнить ожидание любого в цикле, просто добавить дополнительный рабочий элемент и т. д.

В этом докладе рассматриваются некоторые передовые методы, которые могут быть вам полезны: http://channel9.msdn.com/Events/TechEd/NorthAmerica/2013/WAD-B406

person Joe Giardino    schedule 16.09.2013