Мне нужно эффективно импортировать большое количество данных из файла в базу данных. У меня мало файлов rrf, которые содержат эти данные, размер файла может быть> 400 МБ, и в итоге он может быть> 2 миллиона записей в базу данных из файла.
Что я сделал:
Я читаю необходимые записи в DataTable.
using (StreamReader streamReader = new StreamReader(filePath))
{
IEnumerable<string> values = new List<string>();
while (!streamReader.EndOfStream)
{
string line = streamReader.ReadLine().Split('|');
int index = 0;
var dataRow = dataTable.NewRow();
foreach (var value in values)
{
dataRow[index] = value;
index++;
}
dataTable.Rows.Add(dataRow);
}
}
Чем в транзакции (это критическая точка), я вставляю данные из DataTable
через SqlBulkCopy
в базу данных.
var bcp = new SqlBulkCopy(_sqlConnection, SqlBulkCopyOptions.Default, transaction);
bcp.DestinationTableName = tableName;
bcp.WriteToServer(dataTable);
Проблема заключается в том, что, поскольку каждый DataTable
может содержать более 2 миллионов записей, я использую много оперативной памяти (около 2 ГБ) для хранилища DataTable
.
Вещи как
dataTable.Dispose();
dataTable = null;
или
GC.Collect();
GC.SuppressFinalize();
на самом деле не помогают.
Свойство Batchsize
для SqlBulkCopy
имеет к этому никакого отношения, вся память берется с помощью DataTable
который хранит строки, которые необходимо вставить.
Интересно, есть ли эффективный способ читать данные и использовать SqlBulkCopy
с ним?
По моему опыту, оптимальный размер DataTable для массовой вставки находится где-то между 60 000 строк и 100 000 строк. Кроме того, я нашел повторное использование DataTable медленнее, чем клонирование нового. DataTable.Rows.Clear () не очищает ограничения, а добавление новых строк происходит намного медленнее после первой объемной вставки. DataTable.Clear () намного лучше, но начиная с нового DataTable каждый массив был самым быстрым.
Таким образом, ваш код будет выглядеть так:
int batchSize = 65000;
bool lastLine = streamReader.EndOfStream;
if (dataTable.Rows.Count == batchSize || lastLine) {
// do bulk insert
DataTable temp = dataTable.Clone();
dataTable.Dispose();
dataTable = temp;
}
В дополнение к этому вы можете разделить объемную вставку на свой собственный поток. Таким образом, поток чтения файла будет создавать объекты DataTable, которые будут потреблять ваши объемные вставки. Вам нужно будет добавить семафоры, чтобы убедиться, что поток чтения файла не перепроизводствует, иначе вы будете использовать слишком много памяти.
Вот пример кода продукта / потребления. Не стесняйтесь делать улучшения.
Вы можете играть со временем сна, чтобы увидеть, как код ждет либо на стороне производителя, либо на стороне потребителя.
public static void produce() {
DataObject o2 = new DataObject();
Thread t = new Thread(consume);
t.Start(o2);
for (int i = 0; i < 10; i++) {
if (o2.queue.Count > 2) {
lock(o2.sb)
o2.sb.AppendLine("3 items read, waiting for consume to finish");
o2.wait.Set();
o2.parentWait.WaitOne();
o2.parentWait.Reset();
}
Thread.Sleep(500); // simulate reading data
lock(o2.sb)
o2.sb.AppendLine("Read file: " + i);
lock(o2.queue) {
o2.queue.Add(i);
}
o2.wait.Set();
}
o2.finished = true;
o2.wait.Set();
}
public class DataObject {
public bool finished = false;
public List<int> queue = new List<int>();
public ManualResetEvent wait = new ManualResetEvent(false);
public ManualResetEvent parentWait = new ManualResetEvent(false);
public StringBuilder sb = new StringBuilder();
}
public static void consume(Object o) {
DataObject o2 = (DataObject) o;
while (true) {
if (o2.finished && o2.queue.Count == 0)
break;
if (o2.queue.Count == 0) {
lock(o2.sb)
o2.sb.AppendLine("Nothing in queue, waiting for produce.");
o2.wait.WaitOne();
o2.wait.Reset();
}
Object val = null;
lock(o2.queue) {
val = o2.queue[0];
o2.queue.RemoveAt(0);
}
o2.parentWait.Set(); // notify parent to produce more
lock(o2.sb)
o2.sb.AppendLine("Loading data to SQL: " + val);
Thread.Sleep(500);
}
}