Elaborazione di un file flat in blocchi utilizzando più thread utilizzando il modello produttore / consumatore e SqlBulkCopy nel DB di SQL Server

c# flat-file multithreading sqlbulkcopy sql-server

Domanda

Spero che tu possa sopportare con me. Volevo fornire quante più informazioni possibile. Il problema principale è come creare una struttura (come una pila) che verrà utilizzata da più thread che visualizzerà un valore e lo userà per elaborare un grande file flat e, possibilmente, ricomincerà a funzionare ripetutamente fino a quando non verrà elaborato l'intero file. Se un file ha 100.000 record che possono essere elaborati da 5 thread usando 2.000 row chunks, ciascun thread otterrà 10 blocchi da elaborare.

Il mio obiettivo è spostare i dati in un file flat (con Intestazione ... Sottotitolo ... Dettagli, Dettagli, Dettagli, ... Dettagli, Sottoparola, Sottotitoli ... Dettagli, Dettagli, Dettagli, ... Dettagli, Sottopiatti, Sottotitolo ... Dettaglio, Dettaglio, Dettaglio, ... Dettaglio, Subfooter, Struttura piè di pagina) nel DB OLTP che ha la modalità di ripristino su Semplice (possibile Completo) in 3 tabelle: 1a rappresenta la chiave unica del Sottotitolo presente nella riga del Sottotitolo, 2a una intermedia table SubheaderGroup, che rappresenta il raggruppamento di righe di dettaglio in blocchi di 2000 record (deve avere il PK di identità del sottotitolo come FK e il terzo rappresenta le righe di dettaglio con FK che punta al sottotitolo PK.

Sto facendo la gestione delle transazioni manuale poiché posso avere decine di migliaia di righe di dettaglio e sto usando un campo speciale che è impostato su 0 nelle tabelle di destinazione durante il caricamento e quindi alla fine dell'elaborazione del file sto facendo un aggiornamento transazionale cambiando questo valore a 1 che può segnalare altre applicazioni che il caricamento è terminato.

Voglio tagliare questo file flat in più pezzi uguali (stesso numero di righe) che possono essere elaborati con più thread e importati utilizzando SqlBulkCopy utilizzando IDataReader creato dai metadati della tabella di destinazione).

Voglio utilizzare il pattern produttore / consumatore (come spiegato nel link sottostante - analisi pdf e esempio di codice) per utilizzare SqlBulkCopy con l'opzione SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Questo modello consente di creare più produttori e il numero equivalente di consumatori deve abbonarsi ai produttori per consumare la riga.

Nel progetto TestSqlBulkCopy, il file DataProducer.cs contiene un metodo che simula la produzione di migliaia di record.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Questo metodo verrà eseguito nel contesto di una nuova discussione. Voglio che questo nuovo thread legga solo un pezzo unico di file flat originale e un altro thread procederà all'elaborazione del blocco successivo. I consumatori quindi spostano i dati (che vengono pompati su di essi) nel database SQL Server utilizzando la classe ADO.NET di SqlBulkCopy.

Quindi la domanda qui riguarda il programma principale che stabilisce quale riga deve essere elaborata da lineTo per ogni thread e penso che dovrebbe accadere durante la creazione del thread. La seconda soluzione è probabilmente per i thread per condividere alcune strutture e utilizzare qualcosa di unico (come numero di thread o numero di sequenza) per cercare una struttura condivisa (possibilmente una pila e inserire un valore (bloccando uno stack mentre lo si fa) e quindi il thread successivo quindi preleva il valore successivo, il programma principale preleva il file flat e determina la dimensione dei blocchi e crea lo stack.

Quindi qualcuno può fornire alcuni frammenti di codice, pseudo cod su come più thread potrebbero elaborare un file e ottenere solo una porzione univoca di quel file?

Grazie, Rad

Risposta accettata

Quello che ha funzionato bene per me è usare una coda per conservare il lavoro non elaborato e un dizionario per tenere traccia del lavoro in volo:

  1. Crea una classe worker che accetta il nome file, la riga iniziale e il conteggio delle righe e dispone di un metodo di aggiornamento che inserisce i database. Passa un metodo di callback che l'operatore usa per segnalare quando viene eseguito.
  2. Carica una coda con istanze della classe worker, una per ogni blocco.
  3. Crea un thread di dispatcher che rimuove la coda di un'istanza di lavoro, avvia il suo metodo di aggiornamento e aggiunge l'istanza di lavoro in un dizionario, immesso dal suo ManagedThreadId del thread. Fallo finché non viene raggiunto il numero massimo consentito di thread, come indicato dal Dizionario. Il dispatcher attende fino a quando un thread finisce e poi ne lancia un altro. Ci sono molti modi per aspettare.
  4. Alla fine di ogni thread, il suo callback rimuove il suo ManagedThreadId dal Dizionario. Se il thread si interrompe a causa di un errore (come il timeout della connessione), la richiamata può reinserire il lavoratore nella coda. Questo è un buon posto per aggiornare l'interfaccia utente.
  5. La tua interfaccia utente può mostrare discussioni attive, progresso totale e tempo per blocco. Può consentire all'utente di regolare il numero di thread attivi, sospendere l'elaborazione, mostrare errori o fermarsi prima.
  6. Quando la coda e il dizionario sono vuoti, il gioco è fatto.

Codice demo come app per console:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


Autorizzato sotto: CC-BY-SA with attribution
Non affiliato con Stack Overflow
È legale questo KB? Sì, impara il perché
Autorizzato sotto: CC-BY-SA with attribution
Non affiliato con Stack Overflow
È legale questo KB? Sì, impara il perché