Verarbeiten einer flachen Datei in Chunks mithilfe mehrerer Threads mithilfe des Producer / Consumer-Musters und SqlBulkCopy in der SQL Server-DB

c# flat-file multithreading sqlbulkcopy sql-server

Frage

Ich hoffe, du wirst es mit mir ertragen. Ich wollte so viele Informationen wie möglich zur Verfügung stellen. Das Hauptproblem besteht darin, wie man eine Struktur (wie einen Stapel) erstellt, die von mehreren Threads verwendet wird, die einen Wert ausgeben und damit eine große flache Datei verarbeiten und möglicherweise immer wieder zyklisch arbeiten, bis die gesamte Datei verarbeitet ist. Wenn eine Datei 100.000 Datensätze hat, die von 5 Threads mit 2.000 Zeilen-Chunks verarbeitet werden können, erhält jeder Thread 10 Chunks zur Verarbeitung.

Mein Ziel ist es, Daten in einer flachen Datei zu verschieben (mit Header ... Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Subheader ... Detail, Detail, Detail, ... Detail, SubFooter, Footer-Struktur) in OLTP DB, die den Recovery-Modus auf Simple (möglich Full) in 3 Tabellen hat: 1. Subheader-eindeutigen Schlüssel in Subheader-Zeile, 2. eine Zwischenstufe Tabelle SubheaderGroup, die die Gruppierung von Detailzeilen in Blöcken von 2000 Datensätzen darstellt (benötigt die Identity PK der Subheader als FK und die dritte Detailzeile mit FK, die auf Subheader PK verweist.

Ich mache manuelle Transaktionsverwaltung, da ich Zehntausende von Detailzeilen haben kann, und ich verwende ein spezielles Feld, das in Zieltabellen während des Ladens auf 0 gesetzt wird, und dann am Ende der Dateiverarbeitung mache ich eine transaktionale Aktualisierung, die dies ändert Wert auf 1, der anderen Anwendungen signalisieren kann, dass das Laden beendet ist.

Ich möchte diese flache Datei in mehrere gleiche Teile zerhacken (die gleiche Anzahl von Zeilen), die mit mehreren Threads verarbeitet werden können und mit SqlBulkCopy mit IDataReader importiert werden können, das aus Metadaten der Zieltabelle erstellt wird.

Ich möchte das Producer / Consumer-Muster verwenden (wie unten im Link erklärt - PDF-Analyse und Codebeispiel), um SqlBulkCopy mit der SqlBulkCopyOptions.TableLock-Option zu verwenden. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Dieses Muster ermöglicht die Erstellung mehrerer Produzenten und die entsprechende Anzahl von Konsumenten müssen Produzenten abonnieren, um die Reihe zu konsumieren.

Im TestSqlBulkCopy-Projekt, DataProducer.cs-Datei gibt es eine Methode, die die Produktion von Tausenden von Datensätzen simuliert.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Diese Methode wird im Kontext eines neuen Threads ausgeführt. Ich möchte, dass dieser neue Thread nur einen eindeutigen Teil der ursprünglichen Flat-Datei liest und ein anderer Thread den nächsten Teil verarbeitet. Verbraucher würden dann Daten (die zu ihnen gepumpt werden) in die SQL Server-Datenbank mit der SqlBulkCopy ADO.NET-Klasse verschieben.

Also die Frage hier ist über Hauptprogramm diktieren, welche Zeile von LineTo sollte von jedem Thread verarbeitet werden, und ich denke, dass sollte während der Erstellung von Threads passieren. Die zweite Lösung ist wahrscheinlich, dass Threads eine Struktur teilen und etwas Einzigartiges verwenden (wie Thread-Nummer oder Sequenznummer), um eine gemeinsame Struktur zu suchen (möglicherweise einen Stack und einen Wert zu knacken und einen nächsten Stack zu fixieren) Nehmen Sie dann den nächsten Wert. Das Hauptprogramm wird in die flache Datei aufnehmen und die Größe der Stücke bestimmen und den Stapel erstellen.

Kann also jemand Code-Snippets bereitstellen, Pseudo-Code darüber, wie mehrere Threads eine Datei verarbeiten und nur einen eindeutigen Teil dieser Datei erhalten?

Danke, Rad

Akzeptierte Antwort

Was für mich gut funktioniert hat, ist eine Warteschlange zu verwenden, um unverarbeitete Arbeit zu halten und ein Wörterbuch, um die Arbeit während des Fluges zu verfolgen:

  1. Erstellen Sie eine Worker-Klasse, die den Dateinamen, die Startlinie und die Zeilenanzahl verwendet, und über eine Aktualisierungsmethode verfügt, die die Datenbank einfügt. Übergeben Sie eine Callback-Methode, die der Worker verwendet, um zu signalisieren, wann er fertig ist.
  2. Laden Sie eine Warteschlange mit Instanzen der Worker-Klasse, eine für jeden Chunk.
  3. Spawn eines Dispatcher-Threads, der eine Worker-Instanz aus der Warteschlange nimmt, ihre Update-Methode startet und die Worker-Instanz in ein Dictionary einfügt, das durch die ManagedThreadId-Eigenschaft des Threads verschlüsselt wird. Tun Sie dies, bis Ihre maximal zulässige Anzahl von Threads erreicht ist, wie von Dictionary.Count angegeben. Der Dispatcher wartet bis ein Thread beendet ist und startet dann einen anderen. Es gibt mehrere Möglichkeiten, darauf zu warten.
  4. Wenn jeder Thread beendet ist, entfernt sein Rückruf seine ManagedThreadId aus dem Dictionary. Wenn der Thread aufgrund eines Fehlers beendet wird (z. B. Verbindungstimeout), kann der Rückruf den Worker erneut in die Warteschlange einfügen. Dies ist ein guter Ort, um Ihre Benutzeroberfläche zu aktualisieren.
  5. Ihre Benutzeroberfläche kann aktive Threads, den Gesamtfortschritt und die Zeit pro Chunk anzeigen. Der Benutzer kann die Anzahl der aktiven Threads anpassen, die Verarbeitung anhalten, Fehler anzeigen oder vorzeitig beenden.
  6. Wenn die Warteschlange und das Wörterbuch leer sind, sind Sie fertig.

Demo-Code als Konsolen-App:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


Lizenziert unter: CC-BY-SA with attribution
Nicht verbunden mit Stack Overflow
Ist diese KB legal? Ja, lerne warum
Lizenziert unter: CC-BY-SA with attribution
Nicht verbunden mit Stack Overflow
Ist diese KB legal? Ja, lerne warum