Приведенный ниже код продолжает создавать потоки, даже если очередь пуста... пока в конечном итоге не возникнет исключение OutOfMemory. Если я заменю Parallel.ForEach на обычный foreach, этого не произойдет. кто-нибудь знает причины, почему это может произойти?
public delegate void DataChangedDelegate(DataItem obj);
public class Consumer
{
public DataChangedDelegate OnCustomerChanged;
public DataChangedDelegate OnOrdersChanged;
private CancellationTokenSource cts;
private CancellationToken ct;
private BlockingCollection<DataItem> queue;
public Consumer(BlockingCollection<DataItem> queue) {
this.queue = queue;
Start();
}
private void Start() {
cts = new CancellationTokenSource();
ct = cts.Token;
Task.Factory.StartNew(() => DoWork(), ct);
}
private void DoWork() {
Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
if (item.DataType == DataTypes.Customer) {
OnCustomerChanged(item);
} else if(item.DataType == DataTypes.Order) {
OnOrdersChanged(item);
}
});
}
}