Почему Parallel.Foreach создает бесконечные потоки?

Приведенный ниже код продолжает создавать потоки, даже если очередь пуста... пока в конечном итоге не возникнет исключение OutOfMemory. Если я заменю Parallel.ForEach на обычный foreach, этого не произойдет. кто-нибудь знает причины, почему это может произойти?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}

person mike01010    schedule 22.06.2012    source источник
comment
Вы пытались запустить foreach с низкой степенью параллелизма (например, 4, 8 и т. д.) и посмотреть, сохраняется ли проблема?   -  person Tudor    schedule 22.06.2012
comment
Это ограничивает количество создаваемых потоков (но я не могу идти в ногу с производителями). Что это значит об исходном коде тогда? Должен ли пул TPL управлять этим должным образом?   -  person mike01010    schedule 22.06.2012
comment
Как создать экземпляр потребителя? Возможно, вы делаете это в цикле?   -  person Panagiotis Kanavos    schedule 22.06.2012


Ответы (3)


Я думаю, что Parallel.ForEach() был создан в первую очередь для обработки ограниченных коллекций. И он не ожидает коллекций, подобных той, что возвращает GetConsumingPartitioner(), где MoveNext() блокируется на долгое время.

Проблема в том, что Parallel.ForEach() пытается найти наилучшую степень параллелизма, поэтому запускает столько Task, сколько позволяет TaskScheduler. Но TaskScheduler видит, что есть много Task, выполнение которых занимает очень много времени, и что они ничего не делают (они блокируются), поэтому он продолжает начинать новые.

Я думаю, что лучшее решение - установить MaxDegreeOfParallelism.

В качестве альтернативы вы можете использовать ActionBlock потока данных TPL. Основное отличие в этом случае состоит в том, что ActionBlock не блокирует потоки, когда нет элементов для обработки, поэтому количество потоков не приближается к пределу.

person svick    schedule 22.06.2012
comment
GetConsumingPartitioner был создан специально для использования BlockingCollection с Parallel.ForEach. ">blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx - person Panagiotis Kanavos; 22.06.2012
comment
Да, я это читал. Это позволяет избежать некоторых проблем, которые возникают у вас с GetConsumingCollection(), но не всех. - person svick; 22.06.2012
comment
Это должен быть ответ - person Saeed Neamati; 16.06.2014

Шаблон производитель/потребитель в основном используется, когда есть только один производитель и один потребитель.

Однако то, чего вы пытаетесь достичь (несколько потребителей), более точно соответствует шаблону рабочего списка. Следующий код был взят со слайда для слайда модуля 2 «2c — Shared Memory Patterns» из курса параллельного программирования, преподаваемого в Университете штата Юта, который доступен для загрузки по адресу http://ppcp.codeplex.com/

BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount

public void Run()
{
  int num_workers = 4;

  //create worklist, filled with initial work
  worklist = new BlockingCollection<Item>(
    new ConcurrentQueue<Item>(GetInitialWork()));

  cts = new CancellationTokenSource();
  itemcount = worklist.Count();

  for( int i = 0; i < num_workers; i++)
    Task.Factory.StartNew( RunWorker );
}

IEnumberable<Item> GetInitialWork() { ... }

public void RunWorker() {
  try  {
    do {
      Item i = worklist.Take( cts.Token );
      //blocks until item available or cancelled
          Process(i);
      //exit loop if no more items left
    } while (Interlocked.Decrement( ref itemcount) > 0);
  } finally {
      if( ! cts.IsCancellationRequested )
        cts.Cancel();
    }
  }
}

public void AddWork( Item item) {
  Interlocked.Increment( ref itemcount );
  worklist.Add(item);
}

public void Process( Item i ) 
{
  //Do what you want to the work item here.
}

Приведенный выше код позволяет добавлять элементы рабочего списка в очередь и позволяет задать произвольное количество рабочих процессов (в данном случае четыре), которые будут извлекать элементы из очереди и обрабатывать их.

Другим замечательным ресурсом для Parallelism on .Net 4.0 является книга «Параллельное программирование с Microsoft .Net», которая находится в свободном доступе по адресу: http://msdn.microsoft.com/en-us/library/ff963553

person Xantix    schedule 23.06.2012
comment
+1 Пожалуйста, помогите мне. Я не могу понять, как получить книгу бесплатно по этой ссылке. - person paparazzo; 23.06.2012
comment
@Blam Просто нажмите на разделы в левом меню. - person svick; 23.06.2012
comment
Спасибо, у меня сломалась левая. - person paparazzo; 23.06.2012

Внутри библиотеки параллельных задач Parallel.For и Parallel.Foreach следуют алгоритму восхождения на вершину, чтобы определить, сколько параллелизма следует использовать для операции.

Более или менее они начинают с запуска тела на одной задаче, переходят к двум и так далее, пока не будет достигнута точка останова и им не потребуется уменьшить количество задач.

Это очень хорошо работает для тел методов, которые выполняются быстро, но если тело выполняется долго, может пройти много времени, прежде чем оно поймет, что необходимо уменьшить степень параллелизма. До этого момента он продолжает добавлять задачи и, возможно, приводит к сбою компьютера.

Я узнал об этом во время лекции, прочитанной одним из разработчиков Task Parallel Library.

Указание MaxDegreeOfParallelism, вероятно, самый простой способ.

person Xantix    schedule 22.06.2012
comment
ThreadPool использует восхождение на холм, а не Parallel.ForEach. Это запустит столько Task, сколько TaskScheduler позволит ему запуститься. И поскольку здесь кажется, что Tasks могут блокироваться в течение (относительно) долгого времени, это будет большое число. Но да, MDOP, пожалуй, лучшее, что можно здесь сделать. - person svick; 23.06.2012