Traitement d'un fichier plat en morceaux utilisant plusieurs threads à l'aide du modèle producteur / consommateur et de SqlBulkCopy dans la base de données SQL Server

c# flat-file multithreading sqlbulkcopy sql-server

Question

J'espère que vous allez supporter avec moi. Je voulais fournir autant d'informations que possible. Le principal problème est de savoir comment créer une structure (comme une pile) qui sera utilisée par plusieurs threads pour afficher une valeur et l'utiliser pour traiter un gros fichier plat et éventuellement effectuer des cycles encore et encore jusqu'à ce que le fichier entier soit traité. Si un fichier contient 100 000 enregistrements pouvant être traités par 5 threads à l'aide de 2 000 fragments de ligne, chaque thread devra traiter 10 morceaux.

Mon objectif est de déplacer des données dans un fichier plat (avec en-tête ... Sous-en-tête ... Détail, Détail, Détail, ... Détail, Sous-Footer, Sous-tête ... Détail, Détail, Détail, ... Détail, SubFooter, Sous en-tête ... (détail, détail, détail, ... détail, sous-tireur, structure de bas de page) dans la base de données OLTP qui a le mode de récupération sur Simple (possible en mode plein) dans 3 tables: la première représente la clé unique de la sous-tête présente dans la ligne de sous-en-tête, le deuxième est un intermédiaire table SubheaderGroup, représentant le regroupement des lignes de détail dans des morceaux de 2 000 enregistrements (doit avoir une clé d'identité du sous-en-tête comme FK et une 3ème représentant des lignes de détail avec FK pointant sur une clé d'en-tête.

Je suis en train de gérer manuellement les transactions car je peux avoir des dizaines de milliers de lignes Detail et j'utilise un champ spécial qui est défini sur 0 dans les tables de destination pendant le chargement, puis à la fin du traitement du fichier, je fais une modification transactionnelle. valeur à 1 qui peut signaler à une autre application que le chargement est terminé.

Je souhaite découper ce fichier à plat en plusieurs parties égales (même nombre de lignes) pouvant être traitées avec plusieurs threads et importées à l'aide de SqlBulkCopy à l'aide d'IDataReader créé à partir des métadonnées de la table Destination).

Je veux utiliser le modèle producteur / consommateur (comme expliqué dans le lien ci-dessous - Analyse pdf et exemple de code) pour utiliser SqlBulkCopy avec l'option SqlBulkCopyOptions.TableLock. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Ce modèle permet de créer plusieurs producteurs et un nombre équivalent de consommateurs doivent s'abonner aux producteurs pour consommer la ligne.

Dans le projet TestSqlBulkCopy, le fichier DataProducer.cs, il existe une méthode qui simule la production de milliers d'enregistrements.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Cette méthode sera exécutée dans le contexte d'un nouveau thread. Je veux que ce nouveau fil ne lise qu’un bloc unique de fichier plat original et un autre thread lancera le traitement du bloc suivant. Les consommateurs déplaceraient ensuite les données (qui y sont pompées) vers la base de données SQL Server à l'aide de la classe SqlBulkCopy ADO.NET.

La question ici est donc de savoir si le programme principal dicte quelle ligne doit être traitée par chaque thread et je pense que cela devrait se produire lors de la création du thread. La deuxième solution est probablement que les threads partagent une structure et utilisent quelque chose qui leur est propre (comme un numéro de thread ou un numéro de séquence) pour rechercher une structure partagée (éventuellement une pile et extraire une valeur (verrouiller une pile en le faisant)), puis le prochain thread puis prenez la valeur suivante.Le programme principal prendra dans le fichier plat et déterminera la taille des morceaux et créera la pile.

Ainsi, quelqu'un peut-il fournir des extraits de code, un pseudo-code sur la façon dont plusieurs threads traitent un fichier et obtiennent uniquement une partie unique de ce fichier?

Merci Rad

Réponse acceptée

Ce qui a bien fonctionné pour moi, c’est d’utiliser une file d’attente pour les travaux non traités et un dictionnaire pour suivre les travaux en vol:

  1. Créez une classe de travail qui prend le nom de fichier, la ligne de départ et le nombre de lignes et possède une méthode de mise à jour qui insère la base de données. Passez une méthode de rappel que le travailleur utilise pour signaler quand c'est fait.
  2. Chargez une file d'attente avec des instances de la classe de travail, une pour chaque morceau.
  3. Créez un thread de distributeur qui met en file d'attente une instance de travail, lance sa méthode de mise à jour et ajoute l'instance de travail dans un dictionnaire, indexé par ManagedThreadId de son fil Faites-le jusqu'à ce que votre nombre de threads maximum autorisé soit atteint, comme indiqué par Dictionary.Count. Le répartiteur attend la fin d'un thread, puis en lance un autre. Il y a plusieurs façons d'attendre.
  4. À la fin de chaque thread, son rappel supprime son ManagedThreadId du dictionnaire. Si le thread se ferme à cause d'une erreur (telle que le délai de connexion), le rappel peut réinsérer le travailleur dans la file d'attente. C'est un bon endroit pour mettre à jour votre interface utilisateur.
  5. Votre interface utilisateur peut afficher les threads actifs, la progression totale et le temps par morceau. Il peut permettre à l'utilisateur de régler le nombre de threads actifs, de suspendre le traitement, d'afficher les erreurs ou d'arrêter tôt.
  6. Lorsque la file d'attente et le dictionnaire sont vides, vous avez terminé.

Code de démonstration en tant qu'application console:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}


Sous licence: CC-BY-SA with attribution
Non affilié à Stack Overflow
Est-ce KB légal? Oui, apprenez pourquoi
Sous licence: CC-BY-SA with attribution
Non affilié à Stack Overflow
Est-ce KB légal? Oui, apprenez pourquoi