Проблема с памятью в реализации TPL Dataflow операции чтения-записи ввода-вывода

Я попытался реализовать операцию чтения и записи с использованием операций файлового ввода-вывода и инкапсулировал эти операции в TransformBlock, чтобы сделать эти операции потокобезопасными вместо использования механизма блокировки.

Но проблема в том, что когда я пытаюсь написать даже 5 файлов параллельно, возникает исключение памяти, и при использовании этой реализации он блокирует поток пользовательского интерфейса. Реализация выполнена в проекте Windows Phone. Пожалуйста, предложите, что не так в этой реализации.

Операция файлового ввода-вывода

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName =>
       {
           return await LoadData<T>(fName);
       }, concurrentExecutionDataFlow);

    transBlock.Post(fileName);

    result = await transBlock.ReceiveAsync();

    return result;
}

public static async Task SaveAsync<T>(T obj, string fileName)
{
    var transBlock = new TransformBlock<Tuple<T, string>, Task>
       (async tupleData =>
       {
          await SaveData(tupleData.Item1, tupleData.Item2);
       }, exclusiveExecutionDataFlow);

    transBlock.Post(new Tuple<T, string>(obj, fileName));

    await transBlock.ReceiveAsync();
}

Использование MainPage.xaml.cs

private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd"
private static string fileName = string.Empty;
private List<string> DataLstSample = new List<string>();
private ObservableCollection<string> TestResults = new ObservableCollection<string>();
private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh";
List<Task> allTsk = new List<Task>();
private Random rand = new Random();
private string  fileNameRand
{
    get
    {
        return rand.Next(100).ToString();
    }
}

public MainPage()
{
    InitializeComponent();

    for (int i = 0; i < 5; i ++)
    {
        DataLstSample.Add((i % 2) == 0 ? data : data1);
    }

}

private void Button_Click(object sender, RoutedEventArgs e)
{
    AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual();
}

public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()
{
    TstRst.Text = "InProgress..";
    allTsk.Clear();

    foreach(var data in DataLstSample)
    {
        var fName = fileNameRand;

        var t = Task.Run(async () =>
        {
            await AppIsolatedStore.SaveAsync<string>(data, fName);
        });

        TestResults.Add(string.Format("Writing file name: {0}, data: {1}", fName, data));
        allTsk.Add(t);
    }

    await Task.WhenAll(allTsk);

    TstRst.Text = "Completed..";
}

Сохранение и загрузка данных асинхронно

        /// <summary>
        /// Load object from file
        /// </summary>
        private static async Task<T> LoadData<T>(string fileName)
        {

            T result = default(T);

            try
            {
                if (!string.IsNullOrWhiteSpace(fileName))
                {
                    using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore))
                    {
                        var data = await _file.ReadTextAsync(file);

                        if (!string.IsNullOrWhiteSpace(data))
                        {
                            result = JsonConvert.DeserializeObject<T>(data);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : {0}", ex.Message);
            }
            finally
            {

            }

            return result;
        }


        /// <summary>
        /// Save object from file
        /// </summary>
        private static async Task SaveData<T>(T obj, string fileName)
        {
            try
            {
                if (obj != null && !string.IsNullOrWhiteSpace(fileName))
                {
                    //Serialize object with JSON or XML serializer
                    string storageString = JsonConvert.SerializeObject(obj);

                    if (!string.IsNullOrWhiteSpace(storageString))
                    {
                        //Write content to file
                        await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString);
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : {0}", ex.Message);
            }
            finally
            {
            }
        }

Изменить:

Причина, по которой у него есть исключение памяти, заключается в том, что строка данных, которую я взял, слишком велика. Строка представляет собой ссылку: http://1drv.ms/1QWSAsc

Но вторая проблема заключается в том, что если я также добавляю небольшие данные, это блокирует поток пользовательского интерфейса. Выполняет ли код какую-либо задачу в UI?


person Balraj Singh    schedule 23.12.2015    source источник


Ответы (2)


Нет, вы используете параллельную пару, которая использует пул потоков по умолчанию для своих задач, и вы создаете экземпляры задач с помощью метода Run, поэтому проблема не здесь. Но код, который у вас здесь, имеет две основные угрозы:

var transBlock = new TransformBlock<string, T>
   (async fName =>
   {
       // process file here
   }, concurrentExecutionDataFlow);

Вы действительно не должны создавать transBlock каждый раз. Основная идея TPL Dataflow заключается в том, что вы создаете блоки один раз, а затем используете их. Таким образом, вы должны реорганизовать свое приложение, чтобы уменьшить количество блоков, которые вы создаете, иначе это не тот случай, когда следует использовать TPL Dataflow.

Другая угроза в вашем коде заключается в том, что вы явно блокируете поток!

// Right here
await Task.WhenAll(allTsk);
TstRst.Text = "Completed..";

Вызов задачи await for из метода async void из синхронного обработчика событий блокирует поток, как по умолчанию он фиксирует контекст синхронизации. Прежде всего, следует избегать async void. Во-вторых, если вы асинхронны, вы должны быть асинхронны на всем пути, поэтому обработчик событий тоже должен быть асинхронным. В-третьих, вы можете использовать продолжение своей задачи для обновления пользовательского интерфейса или использовать текущий контекст синхронизации.

Итак, ваш код должен быть примерно таким:

// store the sync context in the field of your form
SynchronizationContext syncContext = SynchronizationContext.Current;

// avoid the async void :)
public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

// make event handler async - this is the only exception for the async void use rule from above
private async void Button_Click(object sender, RoutedEventArgs e)

// asynchronically wait the result without capturing the context
await Task.WhenAll(allTsk).ContinueWith(
  t => {
    // you can move out this logic to main method
    syncContext.Post(new SendOrPostCallback(o =>
        {
            TstRst.Text = "Completed..";
        }));
  }
);
person VMAtm    schedule 23.12.2015
comment
Я изучаю способы сделать операции ввода-вывода потокобезопасными без использования механизма блокировки. Как избежать побочных эффектов блокировки. Как вы говорите, я должен уменьшить количество создания блоков или использовать подход diff. Можете ли вы предложить способ сделать это лучше или новый способ, который я могу изучить больше. - person Balraj Singh; 23.12.2015
comment
@BalrajSingh TPL Dataflow по-прежнему использует блоки внутри. Ручные операторы lock намного читабельнее и эффективнее. - person VMAtm; 23.12.2015
comment
В последнем примере ConfigureAwait неверно — это не скомпилируется. Также не следует использовать ContinueWith. Если оператор использует поток данных TPL, более идиоматичным решением будет окончательный вариант ActionBlock с пользовательским интерфейсом TaskScheduler. - person Stephen Cleary; 24.12.2015
comment
@StephenCleary Спасибо за дополнение! Я забыл про наручи. - person VMAtm; 24.12.2015

Пробовали ли вы играть с BoundedCapacity в ExecutionDataflowBlockOptions ? В Введение в TPL говорится о емкости блока:

[...] ограничение полезно в сети потока данных, чтобы избежать неограниченного роста памяти. Это может быть очень важно с точки зрения надежности, если существует вероятность того, что производители могут в конечном итоге генерировать данные намного быстрее, чем потребители могут их обрабатывать...

Я предлагаю попробовать использовать эту опцию, чтобы ограничить очередь обработанных элементов и посмотреть, поможет ли это с вашими проблемами с памятью.

person Phil Gref    schedule 23.12.2015
comment
Я установил BoundingCapacity на 1, но проблема не устранена. - person Balraj Singh; 23.12.2015