Производителност на BlockingCollection(T).

За известно време в моята компания използвахме домашна реализация ObjectPool<T>, която осигурява блокиране на достъпа до нейното съдържание. Това е доста лесно: Queue<T>, object за заключване и AutoResetEvent за сигнал към нишка „заемане“, когато се добави елемент.

Същността на класа всъщност са тези два метода:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Използваме този и няколко други класове за събиране вместо предоставените от System.Collections.Concurrent, защото използваме .NET 3.5, а не 4.0. Но наскоро открихме, че тъй като използваме реактивни разширения, всъщност < em>направете пространството от имена Concurrent достъпно за нас (в System.Threading.dll).

Естествено реших, че тъй като BlockingCollection<T> е един от основните класове в Concurrent namespace, вероятно ще предложи по-добра производителност от всичко, което аз или моите съотборници написах.

Така че се опитах да напиша нова реализация, която работи много просто:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

За моя изненада, според някои прости тестове (заемане/връщане в пула няколко хиляди пъти от множество нишки), нашата първоначална реализация значително побеждава BlockingCollection<T> по отношение на производителността. И двете изглежда работят правилно; просто нашата оригинална реализация изглежда много по-бърза.

Въпросът ми:

  1. Защо би било това? Дали може би защото BlockingCollection<T> предлага по-голяма гъвкавост (разбрах, че работи чрез обвиване на IProducerConsumerCollection<T>), което непременно въвежда допълнителни разходи за производителност?
  2. Това просто погрешно използване на класа BlockingCollection<T> ли е?
  3. Ако това е подходящо използване на BlockingCollection<T>, просто не използвам ли правилно? Например, подходът Take/Add твърде опростен ли е и има ли много по-ефективен начин да получите същата функционалност?

Освен ако някой няма какво да предложи в отговор на този трети въпрос, изглежда, че засега ще се придържаме към нашата първоначална реализация.


person Dan Tao    schedule 14.06.2010    source източник
comment
Без да видите вашия бенчмарк, е трудно да коментирате. Може би BlockingCollection‹T› е оптимизиран за сценарии, които са различни от вашия бенчмарк?   -  person Joe    schedule 14.06.2010
comment
@Joe: сценарии? Хаха, хубаво. Както и да е, разбирам - скоро ще дам бенчмарк.   -  person Dan Tao    schedule 14.06.2010
comment
Виждали ли сте бялата книга на Microsoft за характеристиките на производителността на колекцията .net 4? Може просто да е неоптимален сценарий: blogs.msdn .com/b/pfxteam/archive/2010/04/26/9997562.aspx   -  person piers7    schedule 13.07.2010
comment
@piers: Благодаря за страхотната връзка! Не бях чел тази статия; Току-що го прегледах набързо и смятам скоро да го прочета по-задълбочено.   -  person Dan Tao    schedule 13.07.2010
comment
Направих сравнителен анализ и версията .NET 4.6.2, която тествах, е 3 пъти по-бърза. Тестът беше 2 нишки, един производител и един потребител, добавящи и премахващи в тесен цикъл. Може би .NET версията е оптимизирана от OP.   -  person Joe    schedule 21.09.2016


Отговори (3)


Тук има няколко потенциални възможности.

Първо, BlockingCollection<T> в Reactive Extensions е backport и не е точно същото като окончателната версия на .NET 4. Не бих се изненадал, ако производителността на този бекпорт се различава от .NET 4 RTM (въпреки че не съм профилирал специално тази колекция). Голяма част от TPL се представя по-добре в .NET 4, отколкото в .NET 3.5 backport.

Като се има предвид това, подозирам, че вашата реализация ще надхвърли производителността BlockingCollection<T>, ако имате една нишка на производител и една нишка на потребител. С един производител и един потребител вашето заключване ще има по-малко въздействие върху общата производителност, а събитието за нулиране е много ефективно средство за изчакване от страна на потребителя.

Въпреки това, BlockingCollection<T> е проектиран да позволи на много нишки на производителите да "поставят" данни много добре. Това няма да се представи добре с вашето внедряване, тъй като спорът за заключване ще започне да става проблематичен доста бързо.

Като се има предвид това, бих искал също да посоча едно погрешно схващане тук:

...вероятно ще предложи по-добра производителност от всичко, което аз или моите съотборници сме писали.

Това често не е вярно. Класовете за колекция от рамки обикновено се представят много добре, но често не са най-производителната опция за даден сценарий. Като се има предвид това, те са склонни да се представят добре, като същевременно са много гъвкави и много здрави. Те често са склонни да се мащабират много добре. „Домашно написаните“ класове за събиране често превъзхождат рамковите колекции в конкретни сценарии, но са склонни да бъдат проблематични, когато се използват в сценарии извън този, за който са специално проектирани. Подозирам, че това е една от тези ситуации.

person Reed Copsey    schedule 14.06.2010
comment
Не виждам как кодът ще се различава от две нишки срещу много нишки. Все още бихте използвали същите заключващи конструкции. Вероятно той тества и двете, като използва същия брой нишки. - person BC.; 14.06.2010
comment
@BC: BlockingCollection всъщност е колекция без заключване. Това го кара да се мащабира доста по-различно от изпълнението на OP. - person Reed Copsey; 14.06.2010

Опитах BlockingCollection срещу ConurrentQueue/AutoResetEvent комбо (подобно на решението на OP, но без заключване) в .Net 4 и последното комбо беше толкова много по-бързо за моя случай на употреба, че се отказах от BlockingCollection. За съжаление това беше преди почти година и не можах да намеря резултатите от бенчмарка.

Използването на отделно AutoResetEvent не прави нещата много по-сложни. Всъщност човек може дори да го абстрахира, веднъж завинаги, в BlockingCollectionSlim....

BlockingCollection вътрешно разчита и на ConcurrentQueue, но прави малко допълнително жонглиране с тънки семафори и токени за анулиране, което дава допълнителни функции, но на цена, дори когато не се използва. Трябва също да се отбележи, че BlockingCollection не е женен за ConcurrentQueue, но може да се използва и с други имплементатори на IProducerConsumerCollection вместо това.


Неограничена, доста гола реализация на BlockingCollectionSlim:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
person Evgeniy Berezovsky    schedule 26.03.2015

Попаднах на същите проблеми с производителността с BlockingCollection в .Net 4.7.2 и намерих тази публикация. Моят случай е MultipleProducers-MultipleConsumers, по-специално малки парчета данни се четат от много източници и трябва да се обработват от много филтри. Бяха използвани няколко (Env.ProcessorCount) BlockingCollections и завърших с профилиращ инструмент за производителност, който ми каза, че BlockingCollection.GetConsumingEnumerable.MoveNext() изяжда повече процесорно време от действителното филтриране!

Благодаря ти, @Eugene Beresovsky, за твоя код. FYI: В моята среда беше почти два пъти по-бавен от BlockingCollection. И така, ето моята SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

А за критичен за производителността код бих предложил да се избягва readonly модификатор на поле. Той добавя проверка на всеки достъп до поле в IL. Със следния тестов код

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

На Core i5-3210M с 2 ядра и активиран HT имам следния изход:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

И така, SpinLocked версията е два пъти по-бърза от .Net BlockingCollection. Но бих препоръчал да използвате само него! ако наистина предпочитате производителност срещу простота на кода (и поддръжка).

person Dmitry Shaurin    schedule 01.01.2019