Procesando un archivo plano en trozos usando múltiples subprocesos usando el patrón productor / consumidor y SqlBulkCopy en SQL Server DB

c# flat-file multithreading sqlbulkcopy sql-server

Pregunta

Espero que tengas paciencia conmigo. Quería proporcionar tanta información como pueda. El problema principal es cómo crear una estructura (como una pila) que será utilizada por varios subprocesos que mostrarán un valor y lo usarán para procesar un archivo plano grande y posiblemente realizar ciclos una y otra vez hasta que se procese todo el archivo. Si un archivo tiene 100.000 registros que pueden procesarse por 5 subprocesos utilizando 2.000 segmentos de fila, entonces cada subproceso obtendrá 10 fragmentos para procesar.

Mi objetivo es mover datos en un archivo plano (con Encabezado ... Subencabezado ... Detalle, Detalle, Detalle, ... Detalle, Submpezador, Subencabezado ... Detalle, Detalle, Detalle, ... Detalle, Submpaso, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Footer structure) en OLTP DB que tiene el modo de recuperación a Simple (posible Completo) en 3 tablas: la primera representa la clave exclusiva de Subheader presente en la fila de Subheader, la segunda es una intermedia SubheaderGroup de la tabla, que representa la agrupación de filas de detalles en trozos de 2000 registros (debe tener la Identidad PK de Subheader como su FK y la tercera representa las filas de Detalles con FK apuntando a Subheader PK.

Estoy haciendo la gestión manual de transacciones ya que puedo tener decenas de miles de filas de Detalle y estoy usando un campo especial que se establece en 0 en las tablas de destino durante la carga y luego, al final del procesamiento del archivo, estoy haciendo una actualización transaccional cambiando esto Valor a 1 que puede indicar a otra aplicación que la carga finalizó.

Quiero cortar este archivo plano en varias partes iguales (el mismo número de filas) que se pueden procesar con múltiples hilos e importar usando SqlBulkCopy usando IDataReader que se crea a partir de metadatos de la tabla de Destino).

Quiero usar el patrón productor / consumidor (como se explica en el enlace a continuación: análisis de pdf y ejemplo de código) para usar la opción SqlBulkCopy con SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Este patrón permite crear múltiples productores y la cantidad equivalente de consumidores que necesitan suscribirse a los productores para consumir la fila.

En el proyecto TestSqlBulkCopy, archivo DataProducer.cs hay un método que simula la producción de miles de registros.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Este método se ejecutará en el contexto de un nuevo hilo. Quiero que este nuevo hilo lea solo un fragmento único del archivo plano original y otro hilo comenzará a procesar el siguiente fragmento. Los consumidores luego moverían los datos (que se bombean a ellos) a la base de datos de SQL Server utilizando la clase ADO.NET de SqlBulkCopy.

Entonces, la pregunta aquí es sobre el programa principal que dicta qué línea de proceso debe ser procesada por cada subproceso y creo que debería suceder durante la creación del hilo. La segunda solución es probablemente para que los subprocesos compartan alguna estructura y utilicen algo único para ellos (como el número de subproceso o el número de secuencia) para buscar una estructura compartida (posiblemente una pila y un valor emergente (bloquear una pila mientras lo hace) y luego el siguiente subproceso luego seleccione el siguiente valor. El programa principal seleccionará el archivo plano y determinará el tamaño de los fragmentos y creará la pila.

Entonces, ¿alguien puede proporcionar algunos fragmentos de código, pseudo codificar cómo varios subprocesos procesarán un archivo y solo obtendrán una parte única de ese archivo?

Gracias rad

Respuesta aceptada

Lo que me ha funcionado bien es usar una cola para guardar el trabajo no procesado y un diccionario para realizar un seguimiento del trabajo en vuelo:

  1. Cree una clase de trabajador que tome el nombre de archivo, la línea de inicio y el recuento de líneas y tenga un método de actualización que inserte la base de datos. Pase un método de devolución de llamada que el trabajador utiliza para indicar cuándo se realiza.
  2. Cargue una cola con instancias de la clase de trabajo, una para cada fragmento.
  3. Generar un subproceso de despachador que saca de la cola una instancia de trabajador, inicia su método de actualización y agrega la instancia de obrero a un Diccionario, codificado por el ManagedThreadId del subproceso. Haga esto hasta que se alcance el número máximo permitido de hilos, como lo indica el Dictionary.Count. El despachador espera hasta que un hilo termina y luego lanza otro. Hay varias formas para que espere.
  4. A medida que finaliza cada subproceso, su devolución de llamada elimina su ManagedThreadId del Diccionario. Si el subproceso se cierra debido a un error (como el tiempo de espera de la conexión), la devolución de llamada puede reinsertar al trabajador en la cola. Este es un buen lugar para actualizar su interfaz de usuario.
  5. Su interfaz de usuario puede mostrar hilos activos, el progreso total y el tiempo por porción. Puede permitir al usuario ajustar el número de subprocesos activos, pausar el procesamiento, mostrar errores o detenerse antes.
  6. Cuando la cola y el diccionario están vacíos, ya está.

Código de demostración como aplicación de consola:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué
Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué