Запись в файл по одной строке сама по себе затратна, даже если вам не нужно каждый раз открывать поток. Сохранение файлового потока открытым имеет и другие проблемы, поскольку файловые потоки всегда буферизуются, начиная с уровня FileStream
и заканчивая драйвером файловой системы, по соображениям производительности. Вам придется периодически сбрасывать поток, чтобы гарантировать, что данные были записаны на диск.
Чтобы действительно улучшить производительность, вам придется группировать записи, например, с помощью BatchBlock. Как только вы это сделаете, стоимость открытия потока станет незначительной.
Строки также должны генерироваться в последний возможный момент, чтобы избежать создания временных строк, которые необходимо будет удалить сборщиком мусора. При n * 1M записей накладные расходы памяти и ЦП на эти выделения и сборку мусора будут значительными.
Запись журналов библиотек в пакетном режиме перед записью, чтобы избежать снижения производительности.
Вы можете попробовать что-то вроде этого:
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>( records => {
//Create or open a file for appending
using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
или, используя асинхронные методы
var batchBlock=new BatchBlock<Record>(1000);
var writerBlock=new ActionBlock<Record[]>(async records => {
//Create or open a file for appending
await using var writer=new StreamWriter(ThePath,true);
foreach(var record in records)
{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
}
});
batchBlock.LinkTo(writerBlock,options);
Вы можете настроить размер пакета и размер буфера StreamWriter для достижения оптимальной производительности.
Создание фактического блока, который записывает в поток
Пользовательский блок можно создать с помощью метода, показанного в Прохождение пользовательского блока потока данных — вместо создания фактического пользовательского блока создайте что-то, что возвращает все, что необходимо для работы LinkTo
, в данном случае ITargetBlock< T>
:
ITargetBlock<Record> FileExporter(string path)
{
var writer=new StreamWriter(path,true);
var block=new ActionBlock<Record>(async msg=>{
await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
});
//Close the stream when the block completes
block.Completion.ContinueWith(_=>write.Close());
return (ITargetBlock<Record>)target;
}
...
var exporter1=CreateFileExporter(path1);
previous.LinkTo(exporter,options);
Хитрость здесь в том, что поток создается вне блока и остается активным до тех пор, пока блок не завершится. Это не сборщик мусора, потому что он используется другим кодом. Когда блок завершается, нам нужно явно закрыть его, что бы ни случилось. block.Completion.ContinueWith(_=>write.Close());
закроет поток независимо от того, корректно ли завершен блок или нет.
Это тот же код, который использовался в пошаговом руководстве для закрытия выходного BufferBlock :
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
Потоки буферизуются по умолчанию, поэтому вызов WriteLine
не означает, что данные действительно будут записываться на диск. Это означает, что мы не знаем, когда данные действительно будут записаны в файл. В случае сбоя приложения некоторые данные могут быть потеряны.
Память, ввод-вывод и накладные расходы
При работе с 1 млн строк в течение значительного периода времени все складывается. Например, можно использовать File.AppendAllLinesAsync
для одновременной записи пакетов строк, но это приведет к выделению 1M временных строк. На каждой итерации среда выполнения должна будет использовать как минимум оперативную память для этих временных строк в качестве пакета. Использование оперативной памяти начинало увеличиваться до сотен МБ, а затем до ГБ, прежде чем сборщик мусора замораживал потоки.
С 1 миллионом строк и большим количеством данных трудно отлаживать и отслеживать данные в конвейере. Если что-то пойдет не так, все может зависнуть очень быстро. Представьте, например, что 1 млн сообщений застряли в блоке one из-за того, что одно сообщение было заблокировано.
Важно (из соображений здравомыслия и производительности) максимально упростить отдельные компоненты в конвейере.
person
Panagiotis Kanavos
schedule
14.07.2021
to n BufferBlock/ActionBlock
почему? ActionBlock уже имеет входной BufferBlock - person Panagiotis Kanavos   schedule 14.07.2021ActionBlock already has an input BufferBlock
да, но природа BroadcastBlock такова, что доставка не гарантируется, если ActionBlock отстает. Я буду работать на серверах с большим объемом оперативной памяти, где размер буфера не имеет значения. - person amonroejj   schedule 14.07.2021I will be running on high-RAM servers
почему тогда вы пытаетесь не добавлять строку для каждого сообщения? Ответ: вы заботитесь о IO. И несколько операций ввода-вывода всегда медленнее, чем одна пакетная операция. - person Panagiotis Kanavos   schedule 14.07.2021multiple IO operations are always slower than a single batch operation
это не имеет смысла (для меня).WriteLine()
вызывается одинаковое количество раз. - person amonroejj   schedule 14.07.2021By default the ActionBlock has no capacity limit.
Вы правы. Я оглянулся на старую тестовую программу, которую написал несколько месяцев назад для проверки (и на собственном опыте) поведения BroadcastBlock без гарантированной доставки. Я заметил, что мой ActionBlock имеет явный набор BoundedCapacity. Мне не нужно было бы устанавливать BoundedCapacity в моем текущем проекте. - person amonroejj   schedule 14.07.2021File.AppendAllTextAsync
. Во всех случаях код намного проще и безопаснее, чем обработка долгоживущего потока. - person Panagiotis Kanavos   schedule 14.07.2021What does your code really do?
Я упростил это для примера, но технически DataReader обернут как IEnumerable класса POCO. Я поворачиваю длинные данные к широким, поэтому цикл по POCO должен иметь состояние, чтобы знать, когда пришло время начать новую широкую выходную строку. Аспект одного POCO, подающего несколько выходных текстовых файлов, по-прежнему актуален. Я хочу выполнить итерацию POCO только один раз, независимо от количества выходных текстовых файлов, потому что запрос, стоящий за DataReader, — это действительно тяжелая работа. - person amonroejj   schedule 14.07.2021