Поток данных TPL: ActionBlock, который позволяет избежать многократного запуска блока использования (например, для записи в StreamWriter) при каждом вызове своего делегата.

Мне нужно прочитать 1 млн строк из IDataReader и одновременно записать n текстовых файлов. Каждый из этих файлов будет другим подмножеством доступных столбцов; все n текстовые файлы будут состоять из 1 миллиона строк.

Текущий план — один TransformManyBlock для итерации IDataReader, связанный с BroadcastBlock, связанный с парами n BufferBlock/ActionBlock.

Чего я пытаюсь избежать, так это того, чтобы мой делегат ActionBlock выполнял using (StreamWriter x...) { x.WriteLine(); }, который открывал бы и закрывал каждый выходной файл миллион раз.

Моя текущая мысль заключается в том, чтобы вместо ActionBlock написать собственный класс, реализующий ITargetBlock‹›. Есть ли более простой подход?

РЕДАКТИРОВАТЬ 1: Обсуждение имеет значение для моей текущей проблемы, но ответы до сих пор были слишком сосредоточены на поведении файловой системы. Для будущих искателей суть вопроса заключалась в том, как построить какую-то установку/разборку вне делегата ActionBlock. Это применимо к любому виду одноразовых предметов, которые вы обычно заворачиваете в блок использования.

РЕДАКТИРОВАТЬ 2: Согласно @Panagiotis Kanavos, краткое изложение решения состоит в том, чтобы настроить объект перед определением блока, а затем разобрать объект в блоке Completion.ContinueWith.


person amonroejj    schedule 13.07.2021    source источник
comment
Не лучше ли пропустить поток данных TPL и использовать пары n BlockingCollection/Task.Run?   -  person amonroejj    schedule 13.07.2021
comment
Что на самом деле делает ваш код? В TPL Datflow нет ничего плохого. На самом деле, поскольку каждый блок по умолчанию использует только одну задачу, вы даже можете использовать FileStream, созданный вне блока. Однако, если вам нужно написать 1 миллион строк, лучшим решением будет их группирование и запись всей партии за один раз, а не запись построчно.   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
to n BufferBlock/ActionBlock почему? ActionBlock уже имеет входной BufferBlock   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
ActionBlock already has an input BufferBlock да, но природа BroadcastBlock такова, что доставка не гарантируется, если ActionBlock отстает. Я буду работать на серверах с большим объемом оперативной памяти, где размер буфера не имеет значения.   -  person amonroejj    schedule 14.07.2021
comment
По умолчанию ActionBlock не имеет ограничения емкости. Вы ничего не получите, добавив еще один BufferBlock. У него будут те же проблемы, что и у ActionBlock. Если вы хотите гарантировать доставку, вам придется написать дополнительный код для отправки сообщения адресатам.   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
I will be running on high-RAM servers почему тогда вы пытаетесь не добавлять строку для каждого сообщения? Ответ: вы заботитесь о IO. И несколько операций ввода-вывода всегда медленнее, чем одна пакетная операция.   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
multiple IO operations are always slower than a single batch operation это не имеет смысла (для меня). WriteLine() вызывается одинаковое количество раз.   -  person amonroejj    schedule 14.07.2021
comment
By default the ActionBlock has no capacity limit. Вы правы. Я оглянулся на старую тестовую программу, которую написал несколько месяцев назад для проверки (и на собственном опыте) поведения BroadcastBlock без гарантированной доставки. Я заметил, что мой ActionBlock имеет явный набор BoundedCapacity. Мне не нужно было бы устанавливать BoundedCapacity в моем текущем проекте.   -  person amonroejj    schedule 14.07.2021
comment
Но соответствует ли это фактическому вводу-выводу? Опять же, файловый поток буферизуется. Записывая все сразу, вы гарантируете, что данные действительно попадут на диск. IO происходит только тогда, когда буфер заполнен, и если вам нужно, например, 3 или 4 записи по 8 КБ, вы можете построить полную строку с помощью StringBuilder и записать ее всю с помощью File.AppendAllTextAsync. Во всех случаях код намного проще и безопаснее, чем обработка долгоживущего потока.   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
Несколько актуально: BroadcastBlock с гарантированной доставкой в ​​потоке данных TPL   -  person Theodor Zoulias    schedule 14.07.2021
comment
@amonroejj Я добавил функцию, которая использует один поток для всех сообщений, но это делает риск потери неписаных данных и блокирует файл на время жизни конвейера.   -  person Panagiotis Kanavos    schedule 14.07.2021
comment
What does your code really do? Я упростил это для примера, но технически DataReader обернут как IEnumerable класса POCO. Я поворачиваю длинные данные к широким, поэтому цикл по POCO должен иметь состояние, чтобы знать, когда пришло время начать новую широкую выходную строку. Аспект одного POCO, подающего несколько выходных текстовых файлов, по-прежнему актуален. Я хочу выполнить итерацию POCO только один раз, независимо от количества выходных текстовых файлов, потому что запрос, стоящий за DataReader, — это действительно тяжелая работа.   -  person amonroejj    schedule 14.07.2021


Ответы (2)


Запись в файл по одной строке сама по себе затратна, даже если вам не нужно каждый раз открывать поток. Сохранение файлового потока открытым имеет и другие проблемы, поскольку файловые потоки всегда буферизуются, начиная с уровня FileStream и заканчивая драйвером файловой системы, по соображениям производительности. Вам придется периодически сбрасывать поток, чтобы гарантировать, что данные были записаны на диск.

Чтобы действительно улучшить производительность, вам придется группировать записи, например, с помощью BatchBlock. Как только вы это сделаете, стоимость открытия потока станет незначительной.

Строки также должны генерироваться в последний возможный момент, чтобы избежать создания временных строк, которые необходимо будет удалить сборщиком мусора. При n * 1M записей накладные расходы памяти и ЦП на эти выделения и сборку мусора будут значительными.

Запись журналов библиотек в пакетном режиме перед записью, чтобы избежать снижения производительности.

Вы можете попробовать что-то вроде этого:

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
   
    //Create or open a file for appending
    using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    }

});

batchBlock.LinkTo(writerBlock,options);

или, используя асинхронные методы

var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
   
    //Create or open a file for appending
    await using var writer=new StreamWriter(ThePath,true);
    foreach(var record in records)
    {
        await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    }

});

batchBlock.LinkTo(writerBlock,options);

Вы можете настроить размер пакета и размер буфера StreamWriter для достижения оптимальной производительности.

Создание фактического блока, который записывает в поток

Пользовательский блок можно создать с помощью метода, показанного в Прохождение пользовательского блока потока данных — вместо создания фактического пользовательского блока создайте что-то, что возвращает все, что необходимо для работы LinkTo, в данном случае ITargetBlock< T> :

ITargetBlock<Record> FileExporter(string path)
{
    var writer=new StreamWriter(path,true);
    var block=new ActionBlock<Record>(async msg=>{
        await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
    });

    //Close the stream when the block completes
    block.Completion.ContinueWith(_=>write.Close());
    return (ITargetBlock<Record>)target;
}
...


var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);

Хитрость здесь в том, что поток создается вне блока и остается активным до тех пор, пока блок не завершится. Это не сборщик мусора, потому что он используется другим кодом. Когда блок завершается, нам нужно явно закрыть его, что бы ни случилось. block.Completion.ContinueWith(_=>write.Close()); закроет поток независимо от того, корректно ли завершен блок или нет.

Это тот же код, который использовался в пошаговом руководстве для закрытия выходного BufferBlock :

target.Completion.ContinueWith(delegate
{
   if (queue.Count > 0 && queue.Count < windowSize)
      source.Post(queue.ToArray());
   source.Complete();
});

Потоки буферизуются по умолчанию, поэтому вызов WriteLine не означает, что данные действительно будут записываться на диск. Это означает, что мы не знаем, когда данные действительно будут записаны в файл. В случае сбоя приложения некоторые данные могут быть потеряны.

Память, ввод-вывод и накладные расходы

При работе с 1 млн строк в течение значительного периода времени все складывается. Например, можно использовать File.AppendAllLinesAsync для одновременной записи пакетов строк, но это приведет к выделению 1M временных строк. На каждой итерации среда выполнения должна будет использовать как минимум оперативную память для этих временных строк в качестве пакета. Использование оперативной памяти начинало увеличиваться до сотен МБ, а затем до ГБ, прежде чем сборщик мусора замораживал потоки.

С 1 миллионом строк и большим количеством данных трудно отлаживать и отслеживать данные в конвейере. Если что-то пойдет не так, все может зависнуть очень быстро. Представьте, например, что 1 млн сообщений застряли в блоке one из-за того, что одно сообщение было заблокировано.

Важно (из соображений здравомыслия и производительности) максимально упростить отдельные компоненты в конвейере.

person Panagiotis Kanavos    schedule 14.07.2021
comment
Writing to a file one line at a time is expensive in itself even when you don't have to open the stream each time. Кто-нибудь может дать ссылку на это утверждение (честный вопрос, а не сарказм)? - person amonroejj; 14.07.2021
comment
@amonroejj Это будет зависеть от ситуации. Во-первых, дороговизна — понятие относительное. Это дорого по сравнению с некоторыми вещами и не по сравнению с другими. Далее, конечно, написание файла будет во многом зависеть от деталей реализации. Если файл находится на SSD, он будет вести себя совсем иначе, чем жесткий диск или сетевой диск (и сетевой диск будет сильно различаться в зависимости от подключения). - person Servy; 14.07.2021
comment
@amonroejj, если вы не используете буферизацию, каждая запись потока приведет к операции ввода-вывода. По этой причине FileStream буферизуется по умолчанию. Это приводит к меньшему количеству операций ввода-вывода, но всегда есть вероятность, что сбой приведет к потере данных. Хранение файла заблокированным в течение длительного времени может вызвать и другие проблемы. Путем пакетной обработки вы гарантируете, что данные будут записаны, когда вы ожидаете, что и файл будет выпущен. - person Panagiotis Kanavos; 14.07.2021
comment
@amonroejj, когда вам нужно написать 1 миллион строк, все быстро складывается. Вы также теряете возможность легко отлаживать и отслеживать данные, особенно со сложными конвейерами. Вот почему я не использовал WriteAllLinesAsync — для этого потребовалось бы создание 1M временных строк. - person Panagiotis Kanavos; 14.07.2021

Часто при использовании TPL я создаю пользовательские классы, чтобы я мог создавать частные переменные-члены и частные методы, которые используются для блоков в моем конвейере, но вместо реализации ITargetBlock или ISourceBlock я просто буду иметь любые блоки, которые мне нужны, внутри моего пользовательского class, а затем я открываю ITargetBlock и/или ISourceBlock в качестве общедоступных свойств, чтобы другие классы могли использовать исходный и целевой блоки для связи друг с другом.

person TJ Rockefeller    schedule 13.07.2021