プロデューサ/コンシューマパターンとSqlBulkCopyを使用して複数のスレッドを使用してチャンク内のフラットファイルをSQL Server DBに処理する

c# flat-file multithreading sqlbulkcopy sql-server

質問

あなたが私に耐えられることを願っています。できるだけ多くの情報を提供したかったのです。主な問題は、値をポップアップして1つの大きなフラットファイルを処理し、ファイル全体が処理されるまで何度も何度も繰り返し実行する複数のスレッドによって使用される構造(スタックなど)を作成する方法です。ファイルに100.000レコードがあり、2.000行のチャンクを使用して5つのスレッドで処理できる場合、各スレッドは処理するチャンクを10個取得します。

私の目標は、フラットファイルでデータを移動することです(ヘッダ付き...サブヘッダ...詳細、詳細、詳細、...詳細、サブフット、サブヘッダ...詳細、詳細、詳細、...詳細、 Subheader行に存在するサブヘッダの一意のキーを表す1番目のもの、中間のものを2番目のものとする3番目のテーブルに、回復モードがSimple(可能なFull)になっているOLTP DBに、 subheaderGroup。2000レコードのチャンク内の詳細行のグループ化を表します(サブヘッダーのPKをFKとして、FKBがサブヘッダーPKを示す詳細行を表す必要があります)。

私は手動トランザクション管理をしています。なぜなら、何千ものディテール行があり、ロード中にデスティネーションテーブルで0に設定された特別なフィールドを使用していて、ファイル処理の最後にトランザクションをアップレートしています値を1に設定すると、ロードが完了したことを他のアプリケーションに通知できます。

このフラットファイルを、複数のスレッドで処理し、Destinationテーブルのメタデータから作成されたIDataReaderを使用してSqlBulkCopyを使用してインポートできる複数の等しい数(同じ行数)に分割したいと思います。

私は、SqlBulkCopyをSqlBulkCopyOptions.TableLockオプションと共に使用するために、プロデューサ/コンシューマパターン(以下のリンクで説明されているように - pdf解析とコードサンプル)を使用したいと思います。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspxこのパターンでは、複数のプロデューサを作成することができ、同等の数のコンシューマが、プロデューサを購読してその行を消費する必要があります。

TestSqlBulkCopyプロジェクトでは、DataProducer.csファイルに数千のレコードの生成をシミュレートするメソッドがあります。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

このメソッドは、新しいスレッドのコンテキストで実行されます。私はこの新しいスレッドが元のフラットファイルの一意のチャンクだけを読み込み、別のスレッドが次のチャンクの処理を開始することを望みます。その後、消費者はSqlBulkCopy ADO.NETクラスを使用してSQL Server DBにデータを転送します。

ですから、ここで問題となるのは、メインプログラムが各スレッドによってどのlineFromからlineToに処理されるべきかを指示することです。スレッドの作成中にそれが発生するはずです。 2番目の解決策はおそらく、スレッドがいくつかの構造体を共有し、共有構造体を参照するために(スレッド番号やシーケンス番号などの)独自のものを使用することです(スタックを作成してスタックをロックします)。メインプログラムはフラットファイルを選択し、チャンクのサイズを決定してスタックを作成します。

だから、誰かがいくつかのコードスニペット、複数のスレッドが1つのファイルを処理し、そのファイルのユニークな部分だけを取得する方法に関する疑似コードを提供できますか?

ありがとう、ラド

受け入れられた回答

私にとってうまくいくのは、処理されていない作業と辞書を含むキューを使用して、作業中の作業を追跡することです。

  1. ファイル名、開始行、行数を取り、データベースの挿入を行う更新メソッドを持つワーカークラスを作成します。ワーカーが完了したときに通知するために使用するコールバックメソッドを渡します。
  2. 各チャンクに対して1つずつ、ワーカークラスのインスタンスでキューをロードします。
  3. ワーカーインスタンスをデキューし、その更新メソッドを起動し、ワーカーインスタンスをディクショナリに追加し、そのスレッドのManagedThreadIdをキーとするディスパッチャスレッドを生成します。これは、Dictionary.Countで指摘されているように、最大​​許容スレッド数に達するまで行います。ディスパッチャは、スレッドが終了してから別のスレッドが起動するまで待機します。それを待ついくつかの方法があります。
  4. 各スレッドが終了すると、そのコールバックはそのManagedThreadIdをDictionaryから削除します。エラー(接続タイムアウトなど)が原因でスレッドが終了した場合、コールバックはワーカーをキューに再挿入できます。これはあなたのUIを更新するのに適しています。
  5. あなたのUIは、アクティブスレッド、合計進行状況、およびチャンクあたりの時間を表示できます。ユーザーがアクティブなスレッドの数を調整したり、処理を一時停止したり、エラーを表示したり、早期に停止したりすることができます。
  6. キューとディクショナリが空の場合、完了です。

コンソールアプリケーションとしてのデモコード:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


ライセンスを受けた: CC-BY-SA with attribution
所属していない Stack Overflow
このKBは合法ですか? はい、理由を学ぶ
ライセンスを受けた: CC-BY-SA with attribution
所属していない Stack Overflow
このKBは合法ですか? はい、理由を学ぶ