Използване на SqlBulkCopy в многопоточен сценарий с проблем с ThreadPool

Изправен съм пред дилема (!).

В първия сценарий внедрих решение, което репликира данни от една база данни в друга с помощта на SQLBulkCopy синхронно и нямах никакъв проблем.

Сега, използвайки ThreadPool, внедрих същото в асинхронен сценарий, нишка на таблица и всичко работи добре, но след известно време (обикновено 1 час, тъй като операциите за копиране отнемат приблизително същото време), операциите изпращат до ThreadPool спре да се изпълнява. Има един различен SQLBulkCopy, използващ един различен SQLConnection на нишка.

Вече виждам броя на свободните нишки и всички те са безплатни в началото на извикването. Имам един AutoResetEvent, за да изчакам нишките да свършат работата си, преди да стартирам отново, и Semaphore FIFO, който държи брояча на активните нишки.

Има ли някакъв проблем, който съм забравил или който трябва да използвам, когато използвам SqlBulkCopy? Оценявам малко помощ, защото идеите ми свършиха;)


-> Използване

SemaphoreFIFO waitingThreads = new SemaphoreFIFO();
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
(...)
List<TableMappingHolder> list = BulkCopy.Mapping(tables);
waitingThreads.Put(list.Count, 300000);

for (int i = 0; i < list.Count; i++){
    ThreadPool.QueueUserWorkItem(call =>
         //Replication
         (...)
         waitingThreads.Get();

        if (waitingThreads.Counter == 0)
            autoResetEvent.Set();
    );
}

bool finalized = finalized = autoResetEvent.WaitOne(300000);
(...)

//Групово копие

 public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings)
        {
            using (SqlConnection destinationConnection =
                            new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString()))
            {
                destinationConnection.Open();

                // Set up the bulk copy object.
                // Note that the column positions in the source
                // data reader match the column positions in
                // the destination table so there is no need to
                // map columns.
                using (SqlBulkCopy bulkCopy =
                           new SqlBulkCopy(destinationConnection))                    {
                    bulkCopy.BulkCopyTimeout = 300000;
                    bulkCopy.DestinationTableName = _destinationTableName;

                    // Set up the column mappings by name.
                    foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings)
                        bulkCopy.ColumnMappings.Add(columnMapping);

                    try{
                        // Write from the source to the destination.
                        bulkCopy.WriteToServer(reader);
                    }
                    catch (Exception ex){return false;}
                    finally
                    {
                        try{reader.Close();}
                        catch (Exception e){//log}
                        try{bulkCopy.Close();}
                        catch (Exception e){//log}
                        try{destinationConnection.Close(); }
                        catch (Exception e){ //log    }
                    }
                }
            }
            return true;
        }
#

Семафор

public sealed class SemaphoreFIFO
{
    private int _counter;
    private readonly LinkedList<int> waitQueue = new LinkedList<int>();

    public int Counter
    {
        get { return _counter; }
    }

    private void internalNotify()
    {
        if (waitQueue.Count > 0 && _counter == 0)
        {
            Monitor.PulseAll(waitQueue);
        }
    }

    public void Get()
    {
        lock (waitQueue)
        {
            _counter --;
            internalNotify();
        }
    }

    public bool Put(int n, int timeout)
    {
        if (timeout < 0 && timeout != Timeout.Infinite)
            throw new ArgumentOutOfRangeException("timeout");
        if (n < 0)
            throw new ArgumentOutOfRangeException("n");

        lock (waitQueue)
        {
            if (waitQueue.Count == 0 && _counter ==0)
            {
                _counter +=n;
                internalNotify();
                return true;
            }

            int endTime = Environment.TickCount + timeout;
            LinkedListNode<int> me = waitQueue.AddLast(n);
            try
            {
                while (true)
                {
                    Monitor.Wait(waitQueue, timeout);

                    if (waitQueue.First == me && _counter ==0)
                    {
                        _counter += n;
                        waitQueue.RemoveFirst();
                        internalNotify();
                        return true;
                    }

                    if (timeout != Timeout.Infinite)
                    {
                        int remainingTime = endTime - Environment.TickCount;
                        if (remainingTime <= 0)
                        {
                            // TIMEOUT
                            if (waitQueue.First == me)
                            {
                                waitQueue.RemoveFirst();
                                internalNotify();
                            }
                            else
                                waitQueue.Remove(me);
                            return false;
                        }
                        timeout = remainingTime;
                    }
                }
            }
            catch (ThreadInterruptedException e)
            {
                // INTERRUPT
                if (waitQueue.First == me)
                {
                    waitQueue.RemoveFirst();
                    internalNotify();
                }
                else
                    waitQueue.Remove(me);
                throw e;
            }
        }
    }
}

person Soulbe    schedule 06.04.2010    source източник


Отговори (1)


Просто бих се върнал към използването на SQLBulkCopy синхронно. Не съм сигурен какво печелите, като правите куп групови копия едновременно (вместо едно след друго). Може да завърши всичко малко по-бързо, но дори не съм сигурен в това.

person MusiGenesis    schedule 06.04.2010
comment
В моята производствена среда това е синхронно, но това, което искам да постигна, е данните във всички таблици да се записват почти едновременно, защото трябва да се използват от друго приложение - person Soulbe; 06.04.2010
comment
Предполагам, че няма да спечелите много (ако въобще) от този подход. Мисля, че ако стартирате две групови копия едновременно, всяко ще работи около половината по-бързо, отколкото ако се изпълнява отделно, така че нетното време за завършване ще бъде същото. Никога не съм чувал това да се прави (няколко едновременни групови вмъквания) и може да се окаже, че вашите нишки спират да се изпълняват, защото се сблъсквате с фундаментално ограничение за това как могат да се правят групови вмъквания едновременно. - person MusiGenesis; 06.04.2010
comment
Благодаря за коментара Наистина, ако коментирам груповото вмъкване и дам на другите достъп до данните, приложението работи добре! - person Soulbe; 07.04.2010