使用生产者/消费者模式和SqlBulkCopy使用多个线程处理SQL文件数据块中的平面文件

c# flat-file multithreading sqlbulkcopy sql-server

我希望你能忍受我。我想提供尽可能多的信息。主要问题是如何创建一个将由多个线程使用的结构(如堆栈),该线程将弹出一个值并使用它来处理一个大的平面文件,并可能一次又一次地循环,直到处理完整个文件。如果一个文件有100.000条记录,可以由5个线程使用2.000行块处理,那么每个线程将获得10个块进行处理。

我的目标是将数据移动到平面文件中(带标题...副标题...详细信息,详细信息,详细信息,...详细信息,子图表,子标题...详细信息,详细信息,详细信息,...详细信息,子图表, Subheader ...详细信息,详细信息,...详细信息,SubFooter,页脚结构)进入OLTP DB,恢复模式为简单(可能为Full)为3个表:1st表示Subheader的唯一键存在于Subheader行,2nd表示中间表SubheaderGroup,表示以2000个记录的块为单位对细节行进行分组(需要将Subheader的Identity PK作为其FK,第3个表示具有FK指向Subheader PK的Detail行。

我正在进行手动事务管理,因为我可以拥有数万个Detail行,并且我在加载期间使用在目标表中设置为0的特殊字段,然后在文件处理结束时我正在进行事务更新值为1,可以指示其他应用程序已完成加载。

我想将这个平面文件切割成多个相等的部分(相同的行数),这些部分可以使用多个线程进行处理,并使用从目标表元数据创建的IDataReader使用SqlBulkCopy导入。

我想使用生产者/消费者模式(如下面的链接 - pdf分析和代码示例中所述)将SqlBulkCopy与SqlBulkCopyOptions.TableLock选项一起使用。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx此模式允许创建多个生产者,并且相同数量的消费者需要订阅生产者来使用该行。

在TestSqlBulkCopy项目中,DataProducer.cs文件有一个模拟数千条记录生成的方法。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

此方法将在新线程的上下文中执行。我希望这个新线程只读取一个独特的原始平面文件块,另一个线程将继续处理下一个块。然后,消费者将使用SqlBulkCopy ADO.NET类将数据(通过它们泵送到SQL Server DB)移动到SQL Server DB。

所以这里的问题是关于主程序规定每个线程应该处理lineFrom to lineTo,我认为这应该在线程创建期间发生。第二个解决方案可能是线程共享一些结构并使用它们独有的东西(如线程号或序列号)来查找共享结构(可能是一个堆栈并弹出一个值(在执行时锁定一个堆栈)然后下一个线程将然后拾取下一个值。主程序将选择平面文件并确定块的大小并创建堆栈。

那么有人可以提供一些代码片段,关于多个线程如何处理一个文件并且只获得该文件的唯一部分的伪鳕鱼?

谢谢,Rad

一般承认的答案

对我来说有用的是使用队列来保存未处理的工作和字典来跟踪正在进行的工作:

  1. 创建一个包含文件名,起始行和行计数的工作类,并具有执行数据库插入的更新方法。传递一个回调方法,工作者在完成时用它来发出信号。
  2. 使用worker类的实例加载一个Queue,每个chunk一个。
  3. 生成一个调度程序线程,该线程使工作程序实例出列,启动其更新方法,并将工作程序实例添加到字典中,该字符串由其线程的ManagedThreadId键入。执行此操作,直到达到最大允许线程数,如Dictionary.Count所示。调度程序等待一个线程完成然后启动另一个线程。它有几种方法可以等待。
  4. 每个线程完成后,其回调将从Dictionary中删除其ManagedThreadId。如果线程因错误而退出(例如连接超时),则回调可以将worker重新插入Queue。这是更新UI的好地方。
  5. 您的UI可以显示活动线程,总进度和每个块的时间。它可以让用户调整活动线程数,暂停处理,显示错误或提前停止。
  6. 当队列和词典为空时,你就完成了。

演示代码作为控制台应用程序:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


许可下: CC-BY-SA with attribution
不隶属于 Stack Overflow
这个KB合法吗? 是的,了解原因
许可下: CC-BY-SA with attribution
不隶属于 Stack Overflow
这个KB合法吗? 是的,了解原因