생산자 / 고객 패턴 및 SQL Server DB에 대한 SqlBulkCopy를 사용하여 여러 스레드를 사용하여 청크로 플랫 파일 처리

c# flat-file multithreading sqlbulkcopy sql-server

문제

네가 나를 참아 주길 바란다. 가능한 한 많은 정보를 제공하고 싶었습니다. 가장 큰 문제는 값을 띄우고 하나의 큰 플랫 파일을 처리하고 전체 파일이 처리 될 때까지 반복적으로 순환하기 위해 여러 스레드에서 사용하는 구조 (스택과 같은)를 만드는 방법입니다. 파일에 2.000 행 청크를 사용하는 5 개의 스레드가 처리 할 수있는 100.000 개의 레코드가있는 경우 각 스레드는 처리 할 청크를 10 개 가져옵니다.

내 목표는 플랫 파일에서 데이터를 이동하는 것입니다 (Header ... Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, 세부 사항, 세부 사항, 세부 사항, 세부 사항, 세부 사항, 세부 사항, 세부 사항, SubFooter, 꼬리말 구조)를 복구 모드가 Simple (가능한 Full)로 3 개의 테이블로 갖는 OLTP DB에 저장합니다 : 1st은 Subheader 행에있는 Subheader의 고유 키, 표 SubheaderGroup, 2000 레코드 청크로 세부 행을 그룹화하는 것을 나타냅니다 (Subheader의 PK를 FK로, 세 번째로 세부 행을 표시하는 FK가 Subheader PK를 가리킴).

내가 수만 개의 세부 행을 가질 수 있기 때문에 나는 수동 트랜잭션 관리를하고있다. 그리고로드 중에 대상 테이블에 0으로 설정된 특수 필드를 사용하고 있으며, 파일 처리가 끝날 때 트랜잭션 변경을 수행하고있다. 1로 설정하면로드가 완료된 다른 응용 프로그램에 신호를 보낼 수 있습니다.

이 플랫 파일을 다중 스레드로 처리하고 대상 테이블 메타 데이터에서 생성 된 IDataReader를 사용하여 SqlBulkCopy를 사용하여 가져올 수있는 동일한 개수의 여러 행으로 잘라야합니다.

나는 SqlBulkCopy를 SqlBulkCopyOptions.TableLock 옵션과 함께 사용하기 위해 생산자 / 소비자 패턴 (아래 링크 - pdf 분석 및 코드 샘플에서 설명 됨)을 사용하고자합니다. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 이 패턴을 사용하면 여러 개의 생산자를 만들 수 있으며 동일한 수의 소비자가 생산자를 구독하여 행을 소비해야합니다.

TestSqlBulkCopy 프로젝트에서 DataProducer.cs 파일에는 수천 개의 레코드 생성을 시뮬레이트하는 메서드가 있습니다.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

이 메소드는 새 스레드의 컨텍스트에서 실행됩니다. 나는이 새로운 스레드가 독창적 인 플랫 파일의 유일한 덩어리를 읽고 다른 스레드가 다음 덩어리를 처리하도록한다. 그러면 소비자는 SqlBulkCopy ADO.NET 클래스를 사용하여 데이터를 SQL Server DB로 이동합니다.

그래서 여기서 주요 질문은 lineFrom to lineTo가 각각의 쓰레드에 의해 처리되어야하는 쓰레드를 지정하는 것이고 쓰레드 생성 중에는 그렇게되어야한다고 생각합니다. 두 번째 해결 방안은 아마도 쓰레드가 구조체를 공유하고 쓰레드 번호 나 시퀀스 번호와 같은 고유 한 것을 사용하여 공유 구조체를 검색하고 (스택을 작성하고 스택을 잠그는) 스택을 잠그고 다음 스레드가 그 다음 값을 가져옵니다. 메인 프로그램은 플랫 파일을 선택하고 청크의 크기를 결정하고 스택을 생성합니다.

그래서 누군가가 하나의 파일을 처리하고 그 파일의 유일한 부분을 얻는 방법에 대한 코드 스 니펫, 의사 코드를 제공 할 수 있습니까?

고마워, Rad

수락 된 답변

나에게 유리한 점은 대기열을 사용하여 처리되지 않은 작업과 사전을 저장하여 진행중인 작업을 추적하는 것입니다.

  1. 파일 이름, 시작 줄 및 줄 수를 취하고 데이터베이스 삽입을 수행하는 업데이트 메서드가있는 작업자 클래스를 만듭니다. 작업자가 완료되었을 때 신호로 보내는 콜백 메소드를 전달하십시오.
  2. 각 청크마다 하나씩 작업자 클래스의 인스턴스가있는 대기열을로드합니다.
  3. 작업자 인스턴스를 대기열에서 제외하고 작업자 인스턴스를 업데이트하고 작업자 인스턴스를 해당 스레드의 ManagedThreadId가 입력하는 사전에 추가하는 디스패처 스레드를 생성합니다. Dictionary.Count에 명시된 최대 허용 스레드 수에 도달 할 때까지이 작업을 수행하십시오. 디스패처는 스레드가 완료 될 때까지 기다린 다음 다른 스레드를 시작합니다. 기다리는 데는 여러 가지 방법이 있습니다.
  4. 각 스레드가 완료되면 해당 콜백은 해당 ManagedThreadId를 사전에서 제거합니다. 오류 (예 : 연결 시간 초과)로 인해 스레드가 종료되면 콜백은 작업자를 대기열에 다시 삽입 할 수 있습니다. 이것은 UI를 업데이트하기에 좋은 장소입니다.
  5. UI는 활성 스레드, 총 진행률 및 청크 당 시간을 표시 할 수 있습니다. 사용자가 활성 스레드 수를 조정하거나 처리를 일시 중지하거나 오류를 표시하거나 조기에 중지 할 수 있습니다.
  6. 대기열 및 사전이 비어 있으면 완료됩니다.

콘솔 앱으로 데모 코드 :

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


아래 라이선스: CC-BY-SA with attribution
와 제휴하지 않음 Stack Overflow
이 KB는 합법적입니까? 예, 이유를 알아보십시오.
아래 라이선스: CC-BY-SA with attribution
와 제휴하지 않음 Stack Overflow
이 KB는 합법적입니까? 예, 이유를 알아보십시오.