使用生產者/消費者模式和SqlBulkCopy使用多個線程處理SQL文件數據塊中的平面文件

c# flat-file multithreading sqlbulkcopy sql-server

我希望你能忍受我。我想提供盡可能多的信息。主要問題是如何創建一個將由多個線程使用的結構(如堆棧),該線程將彈出一個值並使用它來處理一個大的平面文件,並可能一次又一次地循環,直到處理完整個文件。如果一個文件有100.000條記錄,可以由5個線程使用2.000行塊處理,那麼每個線程將獲得10個塊進行處理。

我的目標是將數據移動到平面文件中(帶標題...副標題...詳細信息,詳細信息,詳細信息,...詳細信息,子圖表,子標題...詳細信息,詳細信息,詳細信息,...詳細信息,子圖表, Subheader ...詳細信息,詳細信息,...詳細信息,SubFooter,頁腳結構)進入OLTP DB,恢復模式為簡單(可能為Full)為3個表:1st表示Subheader的唯一鍵存在於Subheader行,2nd表示中間表SubheaderGroup,表示以2000個記錄的塊為單位對細節行進行分組(需要將Subheader的Identity PK作為其FK,第3個表示具有FK指向Subheader PK的Detail行。

我正在進行手動事務管理,因為我可以擁有數万個Detail行,並且我在加載期間使用在目標表中設置為0的特殊字段,然後在文件處理結束時我正在進行事務更新值為1,可以指示其他應用程序已完成加載。

我想將這個平面文件切割成多個相等的部分(相同的行數),這些部分可以使用多個線程進行處理,並使用從目標表元數據創建的IDataReader使用SqlBulkCopy導入。

我想使用生產者/消費者模式(如下面的鏈接 - pdf分析和代碼示例中所述)將SqlBulkCopy與SqlBulkCopyOptions.TableLock選項一起使用。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx此模式允許創建多個生產者,並且相同數量的消費者需要訂閱生產者來使用該行。

在TestSqlBulkCopy項目中,DataProducer.cs文件有一個模擬數千條記錄生成的方法。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

此方法將在新線程的上下文中執行。我希望這個新線程只讀取一個獨特的原始平面文件塊,另一個線程將繼續處理下一個塊。然後,消費者將使用SqlBulkCopy ADO.NET類將數據(通過它們泵送到SQL Server DB)移動到SQL Server DB。

所以這裡的問題是關於主程序規定每個線程應該處理lineFrom to lineTo,我認為這應該在線程創建期間發生。第二個解決方案可能是線程共享一些結構並使用它們獨有的東西(如線程號或序列號)來查找共享結構(可能是一個堆棧並彈出一個值(在執行時鎖定一個堆棧)然後下一個線程將然後拾取下一個值。主程序將選擇平面文件並確定塊的大小並創建堆棧。

那麼有人可以提供一些代碼片段,關於多個線程如何處理一個文件並且只獲得該文件的唯一部分的偽鱈魚?

謝謝,Rad

一般承認的答案

對我來說有用的是使用隊列來保存未處理的工作和字典來跟踪正在進行的工作:

  1. 創建一個包含文件名,起始行和行計數的工作類,並具有執行數據庫插入的更新方法。傳遞一個回調方法,工作者在完成時用它來發出信號。
  2. 使用worker類的實例加載一個Queue,每個chunk一個。
  3. 生成一個調度程序線程,該線程使工作程序實例出列,啟動其更新方法,並將工作程序實例添加到字典中,該字符串由其線程的ManagedThreadId鍵入。執行此操作,直到達到最大允許線程數,如Dictionary.Count所示。調度程序等待一個線程完成然後啟動另一個線程。它有幾種方法可以等待。
  4. 每個線程完成後,其回調將從Dictionary中刪除其ManagedThreadId。如果線程因錯誤而退出(例如連接超時),則回調可以將worker重新插入Queue。這是更新UI的好地方。
  5. 您的UI可以顯示活動線程,總進度和每個塊的時間。它可以讓用戶調整活動線程數,暫停處理,顯示錯誤或提前停止。
  6. 當隊列和詞典為空時,你就完成了。

演示代碼作為控制台應用程序:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


許可下: CC-BY-SA with attribution
不隸屬於 Stack Overflow
這個KB合法嗎? 是的,了解原因
許可下: CC-BY-SA with attribution
不隸屬於 Stack Overflow
這個KB合法嗎? 是的,了解原因