Древовидная иерархия запросов WebClient с использованием TPL

Я разрабатываю приложение WPF на С#, где у меня есть Uri, который я хочу загрузить данные Json. Код десериализует загруженные данные Json в объект, после чего у объекта есть список Uris, для которого потребуется запросить больше данных Json, которые я хотел бы запросить параллельно. И загруженные данные Json могут иметь больше Uris для запроса. Код должен иметь возможность выполнять WebClient.CancelAsync для родительского и его дочернего WebClient, когда это необходимо. Я смотрю на библиотеку параллельных задач и затрудняюсь понять и реализовать ее. Я не уверен, следует ли использовать токен отмены TPL для вызова CancelAsync или CancelAsync для отмены токена отмены TPL. И я не уверен, следует ли мне использовать вложенные задачи для дочернего WebClient....?

Если у кого-то есть подобный сценарий и он реализовал его с использованием нового TPL... не могли бы вы поделиться фрагментом кода...

Спасибо.


person icube    schedule 12.09.2011    source источник


Ответы (2)


Если я могу предложить использовать Reactive Extensions (Rx) поверх TPL, то это можно сделать довольно легко без необходимости отмены задач и т. д.

Если я могу предположить, что у вас есть следующее:

// The initial Uri
Uri initialUri = ...;

// A function to return the JSON string from a given Uri
Func<Uri, string> getJason = ...; 

// Turn the JSON into the next set of Uris to fetch
Func<string, IEnumerable<Uri>> getUris = ...; 

Затем, используя Rx, вы превращаете эти функции в функции, которые возвращают наблюдаемые объекты с помощью пула задач, например:

Func<Uri, IObservable<string>> getJasonObsFunc = u =>
    Observable
        .FromAsyncPattern<Uri, string>(
            getJason.BeginInvoke,
            getJason.EndInvoke)
        .Invoke(u)
        .ObserveOn(Scheduler.TaskPool);

Func<string, IObservable<Uri>> getUrisObsFunc = j =>
    Observable
        .FromAsyncPattern<string, IEnumerable<Uri>>(
            getUris.BeginInvoke,
            getUris.EndInvoke)
        .Invoke(j)
        .ObserveOn(Scheduler.TaskPool)
        .SelectMany(xs => xs.ToObservable());

Вам понадобится обратный вызов, чтобы получить пары Uri/JSON. Что-то вроде этого:

Action<Uri, string> callback = (u, j) =>
    Console.WriteLine(String.Format("{0} => {1}", u, j));

Вот рекурсивный запрос LINQ, который будет рекурсивно извлекать каждую строку JSON:

Func<Uri, IObservable<Uri>> getAllUris = null;
getAllUris = u =>
    Observable
        .Return<Uri>(u)
        .Merge(
            from j in getJasonObsFunc(u).Do(k => callback(u, k))
            from u1 in getUrisObsFunc(j)
            from u2 in getAllUris(u1)
            select u2);

Затем вы вызываете все это добро, используя следующую строку:

var subscription = getAllUris(initialUri).Subscribe();

Теперь, если вы хотите отменить выполнение запроса, просто вызовите это:

subscription.Dispose();

Rx обрабатывает все задачи и отменяет их все за вас.

Надеюсь, это поможет.

Вот ссылки для Rx:

person Enigmativity    schedule 14.09.2011

Я бы рекомендовал вам избегать создания вложенных задач, потому что вы теряете контроль над количеством запущенных задач, вместо этого вы можете использовать шаблон отправки с помощью BlockingCollection, где количество рабочих задач ограничено, они отправляют работу из коллекции и могут добавлять при необходимости больше работы с ним, и когда все объекты загружены, вы вызываете CompleteAdding, чтобы разблокировать все ожидающие задачи.

person Emad Omara    schedule 14.09.2011