Performance issue with SqlBulkCopy and DataTable

c# datatable performance sqlbulkcopy streamreader

Question

I need to efficiently import large amount of data from file to database. I have few rrf file which contain that data, the size of a file could be > 400mb and eventually it could be > 2 million record to database from file.

What did I do:

  1. I am reading needed records in DataTable.

    using (StreamReader streamReader = new StreamReader(filePath))
    {
        IEnumerable<string> values = new List<string>();
    
        while (!streamReader.EndOfStream)
        {
            string line = streamReader.ReadLine().Split('|');
    
             int index = 0;
             var dataRow = dataTable.NewRow();
    
             foreach (var value in values)
             {
                dataRow[index] = value;
                index++;
             }
    
             dataTable.Rows.Add(dataRow);
        }
    }
    
  2. Than within transaction (this is a critical point), I insert data from DataTable via SqlBulkCopy to database.

    var bcp = new SqlBulkCopy(_sqlConnection, SqlBulkCopyOptions.Default, transaction);
    bcp.DestinationTableName = tableName;          
    bcp.WriteToServer(dataTable);
    

The problem is that since each DataTable could contain more than 2 million records, I takes to much RAM (around 2 GB) for DataTable storage.

Things like

dataTable.Dispose();
dataTable = null;

or

GC.Collect();
GC.SuppressFinalize();

do not actually help.

The Batchsize property of SqlBulkCopy has nothing to do with it, all memory is taken by the DataTable which stores rows that should be inserted.

I wonder, is there efficient way to read data and use SqlBulkCopy with it?

Popular Answer

In my experience, the optimal DataTable size for bulk inserting is somewhere between 60,000 rows and 100,000 rows. Also, I found reusing a DataTable to be slower than cloning a fresh one. DataTable.Rows.Clear() doesn't clear the constraints, and adding new rows is much slower after the first bulk insert. DataTable.Clear() is much better, but starting with a fresh DataTable each bulk was the fastest.

So your code would look like:

int batchSize = 65000;
bool lastLine = streamReader.EndOfStream;

if (dataTable.Rows.Count == batchSize || lastLine) {
    // do bulk insert
    DataTable temp = dataTable.Clone();
    dataTable.Dispose();
    dataTable = temp;
}

In addition to that, you can separate the bulk inserting into its own thread. So your file reading thread would produce DataTable objects that your bulk insert thread would consume. You would have to add semaphores to make sure your file reading thread doesn't over-produce, otherwise you will use too much memory.

Here's an example of the produce/consume code. Feel free to make improvements to it.

You can play around with the sleep times to see how the code waits either on the producer side or the consumer side.

public static void produce() {

    DataObject o2 = new DataObject();
    Thread t = new Thread(consume);
    t.Start(o2);

    for (int i = 0; i < 10; i++) {
        if (o2.queue.Count > 2) {
            lock(o2.sb)
                o2.sb.AppendLine("3 items read, waiting for consume to finish");

            o2.wait.Set();
            o2.parentWait.WaitOne();
            o2.parentWait.Reset();
        }

        Thread.Sleep(500); // simulate reading data

        lock(o2.sb)
            o2.sb.AppendLine("Read file: " + i);

        lock(o2.queue) {
            o2.queue.Add(i);
        }
        o2.wait.Set();
    }

    o2.finished = true;
    o2.wait.Set();
}

public class DataObject {
    public bool finished = false;
    public List<int> queue = new List<int>();
    public ManualResetEvent wait = new ManualResetEvent(false);
    public ManualResetEvent parentWait = new ManualResetEvent(false);
    public StringBuilder sb = new StringBuilder();
}

public static void consume(Object o) {
    DataObject o2 = (DataObject) o;

    while (true) {
        if (o2.finished && o2.queue.Count == 0)
            break;

        if (o2.queue.Count == 0) {
            lock(o2.sb)
                o2.sb.AppendLine("Nothing in queue, waiting for produce.");
            o2.wait.WaitOne();
            o2.wait.Reset();
        }

        Object val = null;
        lock(o2.queue) {
            val = o2.queue[0];
            o2.queue.RemoveAt(0);
        }

        o2.parentWait.Set(); // notify parent to produce more

        lock(o2.sb)
            o2.sb.AppendLine("Loading data to SQL: " + val);

        Thread.Sleep(500);
    }
}



Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow
Is this KB legal? Yes, learn why
Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow
Is this KB legal? Yes, learn why