Использование SqlBulkCopy в многопоточном сценарии с проблемой ThreadPool

Я столкнулся с дилеммой (!).

В первом сценарии я реализовал решение, которое синхронно реплицирует данные из одной базы данных в другую, используя SQLBulkCopy, и у меня не было никаких проблем.

Теперь, используя ThreadPool, я реализовал то же самое в асинхронном сценарии, поток на таблицу, и все работает нормально, но через некоторое время (обычно 1 час, потому что операции копирования занимают примерно столько же времени), операции отправляются в ThreadPool прекратить выполняться. Существует один другой SQLBulkCopy, использующий один другой SQLConnection для каждого потока.

Я уже вижу количество свободных потоков, и все они свободны в начале вызова. У меня есть один AutoResetEvent, чтобы дождаться завершения работы потоков перед повторным запуском, и FIFO семафора, который содержит счетчик активных потоков.

Есть ли какая-то проблема, которую я забыл или которую я должен устранить при использовании SqlBulkCopy? Я ценю некоторую помощь, потому что мои идеи закончились;)


-> Использование

SemaphoreFIFO waitingThreads = new SemaphoreFIFO();
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
(...)
List<TableMappingHolder> list = BulkCopy.Mapping(tables);
waitingThreads.Put(list.Count, 300000);

for (int i = 0; i < list.Count; i++){
    ThreadPool.QueueUserWorkItem(call =>
         //Replication
         (...)
         waitingThreads.Get();

        if (waitingThreads.Counter == 0)
            autoResetEvent.Set();
    );
}

bool finalized = finalized = autoResetEvent.WaitOne(300000);
(...)

//Массовое копирование

 public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings)
        {
            using (SqlConnection destinationConnection =
                            new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString()))
            {
                destinationConnection.Open();

                // Set up the bulk copy object.
                // Note that the column positions in the source
                // data reader match the column positions in
                // the destination table so there is no need to
                // map columns.
                using (SqlBulkCopy bulkCopy =
                           new SqlBulkCopy(destinationConnection))                    {
                    bulkCopy.BulkCopyTimeout = 300000;
                    bulkCopy.DestinationTableName = _destinationTableName;

                    // Set up the column mappings by name.
                    foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings)
                        bulkCopy.ColumnMappings.Add(columnMapping);

                    try{
                        // Write from the source to the destination.
                        bulkCopy.WriteToServer(reader);
                    }
                    catch (Exception ex){return false;}
                    finally
                    {
                        try{reader.Close();}
                        catch (Exception e){//log}
                        try{bulkCopy.Close();}
                        catch (Exception e){//log}
                        try{destinationConnection.Close(); }
                        catch (Exception e){ //log    }
                    }
                }
            }
            return true;
        }
#

семафор

public sealed class SemaphoreFIFO
{
    private int _counter;
    private readonly LinkedList<int> waitQueue = new LinkedList<int>();

    public int Counter
    {
        get { return _counter; }
    }

    private void internalNotify()
    {
        if (waitQueue.Count > 0 && _counter == 0)
        {
            Monitor.PulseAll(waitQueue);
        }
    }

    public void Get()
    {
        lock (waitQueue)
        {
            _counter --;
            internalNotify();
        }
    }

    public bool Put(int n, int timeout)
    {
        if (timeout < 0 && timeout != Timeout.Infinite)
            throw new ArgumentOutOfRangeException("timeout");
        if (n < 0)
            throw new ArgumentOutOfRangeException("n");

        lock (waitQueue)
        {
            if (waitQueue.Count == 0 && _counter ==0)
            {
                _counter +=n;
                internalNotify();
                return true;
            }

            int endTime = Environment.TickCount + timeout;
            LinkedListNode<int> me = waitQueue.AddLast(n);
            try
            {
                while (true)
                {
                    Monitor.Wait(waitQueue, timeout);

                    if (waitQueue.First == me && _counter ==0)
                    {
                        _counter += n;
                        waitQueue.RemoveFirst();
                        internalNotify();
                        return true;
                    }

                    if (timeout != Timeout.Infinite)
                    {
                        int remainingTime = endTime - Environment.TickCount;
                        if (remainingTime <= 0)
                        {
                            // TIMEOUT
                            if (waitQueue.First == me)
                            {
                                waitQueue.RemoveFirst();
                                internalNotify();
                            }
                            else
                                waitQueue.Remove(me);
                            return false;
                        }
                        timeout = remainingTime;
                    }
                }
            }
            catch (ThreadInterruptedException e)
            {
                // INTERRUPT
                if (waitQueue.First == me)
                {
                    waitQueue.RemoveFirst();
                    internalNotify();
                }
                else
                    waitQueue.Remove(me);
                throw e;
            }
        }
    }
}

person Soulbe    schedule 06.04.2010    source источник


Ответы (1)


Я бы просто вернулся к синхронному использованию SQLBulkCopy. Я не уверен, что вы получите, делая кучу массовых копий одновременно (а не одну за другой). Это может завершить все немного быстрее, но я даже не уверен в этом.

person MusiGenesis    schedule 06.04.2010
comment
В моей производственной среде это синхронно, но я хочу добиться того, чтобы данные во всех таблицах записывались почти одновременно, потому что они должны использоваться другим приложением. - person Soulbe; 06.04.2010
comment
Я предполагаю, что вы не получите многого (если вообще) от этого подхода. Я думаю, что если вы запустите две массовые копии одновременно, каждая из них будет работать примерно вдвое медленнее, чем при отдельном запуске, поэтому чистое время завершения будет таким же. Я никогда не слышал об этом (несколько одновременных массовых вставок), и может случиться так, что ваши потоки перестанут выполняться, потому что вы сталкиваетесь с фундаментальным ограничением на то, как массовые вставки могут выполняться одновременно. - person MusiGenesis; 06.04.2010
comment
Спасибо за комментарий. Действительно, если я прокомментирую массовую вставку и предоставлю другим доступ к данным, приложение будет работать нормально! - person Soulbe; 07.04.2010