あなたが私に耐えられることを願っています。できるだけ多くの情報を提供したかったのです。主な問題は、値をポップアップして1つの大きなフラットファイルを処理し、ファイル全体が処理されるまで何度も何度も繰り返し実行する複数のスレッドによって使用される構造(スタックなど)を作成する方法です。ファイルに100.000レコードがあり、2.000行のチャンクを使用して5つのスレッドで処理できる場合、各スレッドは処理するチャンクを10個取得します。
私の目標は、フラットファイルでデータを移動することです(ヘッダ付き...サブヘッダ...詳細、詳細、詳細、...詳細、サブフット、サブヘッダ...詳細、詳細、詳細、...詳細、 Subheader行に存在するサブヘッダの一意のキーを表す1番目のもの、中間のものを2番目のものとする3番目のテーブルに、回復モードがSimple(可能なFull)になっているOLTP DBに、 subheaderGroup。2000レコードのチャンク内の詳細行のグループ化を表します(サブヘッダーのPKをFKとして、FKBがサブヘッダーPKを示す詳細行を表す必要があります)。
私は手動トランザクション管理をしています。なぜなら、何千ものディテール行があり、ロード中にデスティネーションテーブルで0に設定された特別なフィールドを使用していて、ファイル処理の最後にトランザクションをアップレートしています値を1に設定すると、ロードが完了したことを他のアプリケーションに通知できます。
このフラットファイルを、複数のスレッドで処理し、Destinationテーブルのメタデータから作成されたIDataReaderを使用してSqlBulkCopyを使用してインポートできる複数の等しい数(同じ行数)に分割したいと思います。
私は、SqlBulkCopyをSqlBulkCopyOptions.TableLockオプションと共に使用するために、プロデューサ/コンシューマパターン(以下のリンクで説明されているように - pdf解析とコードサンプル)を使用したいと思います。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspxこのパターンでは、複数のプロデューサを作成することができ、同等の数のコンシューマが、プロデューサを購読してその行を消費する必要があります。
TestSqlBulkCopyプロジェクトでは、DataProducer.csファイルに数千のレコードの生成をシミュレートするメソッドがあります。
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
このメソッドは、新しいスレッドのコンテキストで実行されます。私はこの新しいスレッドが元のフラットファイルの一意のチャンクだけを読み込み、別のスレッドが次のチャンクの処理を開始することを望みます。その後、消費者はSqlBulkCopy ADO.NETクラスを使用してSQL Server DBにデータを転送します。
ですから、ここで問題となるのは、メインプログラムが各スレッドによってどのlineFromからlineToに処理されるべきかを指示することです。スレッドの作成中にそれが発生するはずです。 2番目の解決策はおそらく、スレッドがいくつかの構造体を共有し、共有構造体を参照するために(スレッド番号やシーケンス番号などの)独自のものを使用することです(スタックを作成してスタックをロックします)。メインプログラムはフラットファイルを選択し、チャンクのサイズを決定してスタックを作成します。
だから、誰かがいくつかのコードスニペット、複数のスレッドが1つのファイルを処理し、そのファイルのユニークな部分だけを取得する方法に関する疑似コードを提供できますか?
ありがとう、ラド
私にとってうまくいくのは、処理されていない作業と辞書を含むキューを使用して、作業中の作業を追跡することです。
コンソールアプリケーションとしてのデモコード:
using System;
using System.Collections.Generic;
using System.Threading;
namespace threadtest
{
public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);
class Program
{
static void Main(string[] args)
{
Supervisor supv = new Supervisor();
supv.LoadQueue();
supv.Dispatch();
}
}
public class Supervisor
{
public Queue<Worker> pendingWork = new Queue<Worker>();
public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();
private object pendingLock = new object();
private object activeLock = new object();
private int maxThreads = 200;
public void LoadQueue()
{
for (int i = 0; i < 1000; i++)
{
Worker worker = new Worker();
worker.Callback = new DoneCallbackDelegate(WorkerFinished);
lock (pendingLock)
{
pendingWork.Enqueue(worker);
}
}
}
public void Dispatch()
{
int activeThreadCount;
while (true)
{
lock (activeLock) { activeThreadCount = activeWork.Count; }
while (true)
{
lock (activeLock)
{
if (activeWork.Count == maxThreads) break;
}
lock (pendingWork)
{
if (pendingWork.Count > 0)
{
Worker worker = pendingWork.Dequeue();
Thread thread = new Thread(new ThreadStart(worker.DoWork));
thread.IsBackground = true;
worker.ThreadId = thread.ManagedThreadId;
lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
thread.Start();
}
else
{
break;
}
}
}
Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)
lock (pendingLock)
lock (activeLock)
{
if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
}
}
}
// remove finished threads from activeWork, resubmit if necessary, and update UI
public void WorkerFinished(int idArg, bool successArg, string messageArg)
{
lock (pendingLock)
lock (activeLock)
{
Worker worker = activeWork[idArg];
activeWork.Remove(idArg);
if (!successArg)
{
// check the message or something to see if you should resubmit thread
pendingWork.Enqueue(worker);
}
// update UI
int left = Console.CursorLeft;
int top = Console.CursorTop;
Console.WriteLine(string.Format("pending:{0} active:{1} ", pendingWork.Count, activeWork.Count));
Console.SetCursorPosition(left, top);
}
}
}
public class Worker
{
// this is where you put in your problem-unique stuff
public int ThreadId { get; set; }
DoneCallbackDelegate callback;
public DoneCallbackDelegate Callback { set { callback = value; } }
public void DoWork()
{
try
{
Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
callback(ThreadId, true, null);
}
catch (Exception ex)
{
callback(ThreadId, false, ex.ToString());
}
}
}
}