Обработка плоского файла в кусках с использованием нескольких потоков с использованием шаблона производителя / потребителя и SqlBulkCopy в SQL Server DB

c# flat-file multithreading sqlbulkcopy sql-server

Вопрос

Надеюсь, ты понесешь меня. Я хотел предоставить как можно больше информации. Основная проблема заключается в том, как создать структуру (например, стек), которая будет использоваться несколькими потоками, которые будут выставлять значение, и использовать его для обработки одного большого плоского файла и, возможно, цикличности снова и снова, пока весь файл не будет обработан. Если файл имеет 100 000 записей, которые могут обрабатываться 5 потоками с использованием 2 000 строк строк, каждый поток будет обрабатывать 10 кусков.

Моя цель состоит в том, чтобы перемещать данные в плоском файле (с заголовком ... Подзаголовок ... Подробно, Подробно, Подробно, ... Подробнее, SubFooter, Subheader ... Подробно, Подробно, Подробно, ... Подробно, SubFooter, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Footer structure) в OLTP DB, который имеет режим восстановления до Simple (возможно Full) в 3 таблицы: 1-й представляет уникальный ключ Subheader, присутствующий в строке Subheader, второй промежуточный table SubheaderGroup, представляющая группировку подробных строк в кусках записей 2000 (требуется, чтобы идентификатор подклассификатора PK как его FK и третий представлял строки с FK, указывающие на PBC подзаголовка.

Я выполняю ручное управление транзакциями, так как у меня могут быть десятки тысяч строк Detail, и я использую специальное поле, которое установлено в 0 в таблицах назначения во время загрузки, а затем в конце обработки файла. Я делаю транзакционную проверку, изменяя это значение 1, которое может сигнализировать другим приложениям, что загрузка завершена.

Я хочу нарезать этот плоский файл на несколько равных частей (столько же строк), которые могут обрабатываться несколькими потоками и импортироваться с использованием SqlBulkCopy с использованием IDataReader, созданного из метаданных таблицы назначения).

Я хочу использовать шаблон производителя / потребителя (как описано в ссылке ниже - pdf-анализ и образец кода), чтобы использовать SqlBulkCopy с параметром SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Этот шаблон позволяет создавать несколько производителей, и эквивалентное количество потребителей должно подписаться на производителей, чтобы потреблять строку.

В проекте TestSqlBulkCopy, файле DataProducer.cs существует метод, который имитирует производство тысяч записей.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Этот метод будет выполнен в контексте нового потока. Я хочу, чтобы этот новый поток читал только уникальный фрагмент исходного плоского файла, а другой поток будет обрабатывать следующий фрагмент. Затем потребители переместили данные (которые перекачиваются к ним) в базу данных SQL Server с использованием класса ADO.NET класса SqlBulkCopy.

Итак, вопрос здесь о главной программе, диктующей, что lineFrom to lineTo должен обрабатываться каждым потоком, и я думаю, что это должно произойти при создании потоков. Второе решение, вероятно, связано с тем, что потоки разделяют некоторую структуру и используют что-то уникальное для них (например, номер потока или порядковый номер) для поиска общей структуры (возможно, стек и всплывающее значение (блокировка стека при его выполнении), а затем следующий поток будет затем выберите следующее значение. Основная программа выберет в плоский файл и определит размер кусков и создаст стек.

Так может ли кто-нибудь предоставить некоторые фрагменты кода, псевдо-трески о том, как несколько потоков будут обрабатывать один файл и только получить уникальную часть этого файла?

Спасибо, Рад

Принятый ответ

То, что хорошо сработало для меня, - это использовать очередь для хранения необработанной работы и словаря для отслеживания работы в полете:

  1. Создайте рабочий класс, который принимает имя файла, строку начала и количество строк и имеет метод обновления, который вставляет базу данных. Передайте метод обратного вызова, который рабочий использует для подачи сигнала, когда он будет выполнен.
  2. Загрузите очередь с экземплярами рабочего класса, по одному для каждого фрагмента.
  3. Создайте поток диспетчера, который удаляет экземпляр рабочего объекта, запускает его метод обновления и добавляет рабочий экземпляр в словарь, связанный с ManagedThreadId его потока. Сделайте это до тех пор, пока не будет достигнут максимально допустимый расход нитей, как указано в Dictionary.Count. Диспетчер ждет, пока нить закончит, а затем запустит другую. Есть несколько способов подождать.
  4. По мере завершения каждого потока его обратный вызов удаляет свой ManagedThreadId из Словаря. Если поток завершает работу из-за ошибки (например, время ожидания соединения), тогда обратный вызов может повторно вставить рабочего в очередь. Это хорошее место для обновления вашего пользовательского интерфейса.
  5. Пользовательский интерфейс может отображать активные потоки, общий прогресс и время на кусок. Он может позволить пользователю настроить количество активных потоков, приостановить обработку, показать ошибки или остановиться раньше.
  6. Когда очередь и словарь пусты, вы закончили.

Демо-код в качестве консольного приложения:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


Лицензировано согласно: CC-BY-SA with attribution
Не связан с Stack Overflow
Является ли этот КБ законным? Да, узнайте, почему
Лицензировано согласно: CC-BY-SA with attribution
Не связан с Stack Overflow
Является ли этот КБ законным? Да, узнайте, почему