Supplying stream as a source of data for a binary column when SqlBulkCopy is used

ado.net c# idatareader sqlbulkcopy sql-server

Question

If one needs to read data from SqlServer in a streamed fashion, there are some capabilities for that. Such as using SqlDataReader with CommandBehavior.SequentialAccess, and particularly when binary column data needs to be accessed there is the GetStream(int) method for that:

var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";

using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
    dr.Read();

    var stream = dr.GetStream(0);
    // access stream
}

But what about streaming data in the opposite direction, when one needs to feed data to SqlServer using SqlBulkCopy, and particularly if stream needs to be supplied as the source of data for a binary column?

I tried following

var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();

using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
    sbc.DestinationTableName = "#Test";
    sbc.EnableStreaming = true;

    sbc.ColumnMappings.Add(0, "ID");
    sbc.ColumnMappings.Add(1, "Data");

    sbc.WriteToServer(new TestDataReader());
}

Where TestDataReader implements IDataReader as follows:

class TestDataReader : IDataReader
{
    public int FieldCount { get { return 2; } }
    int rowCount = 1;
    public bool Read() { return (rowCount++) < 3; }
    public bool IsDBNull(int i) { return false; }

    public object GetValue(int i)
    {
        switch (i)
        {
            case 0: return rowCount;
            case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
            default: throw new Exception();
        }
    }

    //the rest members of IDataReader
}

and it worked as expected.

However changing

case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };

to

case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });

caused exception System.InvalidOperationException with the message

The given value of type MemoryStream from the data source cannot be converted to type varbinary of the specified target column.

Is there a way to supply Stream from IDataReader (or probably DbDataReader) to SqlBulkCopy as the source of data for a binary column, without copying all its data into memory (byte array) first?

Accepted Answer

Not sure this is documented anywhere, but if do short inspection of SqlBulkCopy source code you may find out that it treats different data readers in different ways. First, SqlBulkCopy does support streaming and GetStream, but you might notice that IDataReader interface does not contain GetStream method. So when you feed custom IDataReader implementation to SqlBulkCopy - it will not treat binary columns as streamed and will not accept values of Stream type.

On the other hand - DbDataReader does have this method. If you feed SqlBulkCopy an instance of DbDataReader-inherited class - it will treat all binary columns in streamed manner and it will call DbDataReader.GetStream.

So to fix your problem - inherit from DbDataReader like this:

class TestDataReader : DbDataReader
{
    public override bool IsDBNull(int ordinal) {
        return false;
    }

    public override int FieldCount { get; } = 2;
    int rowCount = 1;

    public override bool HasRows { get; } = true;
    public override bool IsClosed { get; } = false;

    public override bool Read()
    {
        return (rowCount++) < 3;
    }

    public override object GetValue(int ordinal) {
        switch (ordinal) {
            // do not return anything for binary column here - it will not be called
            case 0:
                return rowCount;
            default:
                throw new Exception();
        }
    }

    public override Stream GetStream(int ordinal) {
        // instead - return your stream here
        if (ordinal == 1)
            return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
        throw new Exception();
    }
    // bunch of irrelevant stuff

}

Popular Answer

See following code

static int SendOrders(int totalToSend)
    {
      using (SqlConnection con = new SqlConnection(connectionString))
      {
        con.Open();
        using (SqlTransaction tran = con.BeginTransaction())
        {
          var newOrders =
                  from i in Enumerable.Range(0, totalToSend)
                  select new Order
                  {
                    customer_name = "Customer " + i % 100,
                    quantity = i % 9,
                    order_id = i,
                    order_entry_date = DateTime.Now
                  };

          SqlBulkCopy bc = new SqlBulkCopy(con,
            SqlBulkCopyOptions.CheckConstraints |
            SqlBulkCopyOptions.FireTriggers |
            SqlBulkCopyOptions.KeepNulls, tran);

          bc.BatchSize = 1000;
          bc.DestinationTableName = "order_queue";
          bc.WriteToServer(newOrders.AsDataReader()); 

          tran.Commit();
        }
        con.Close();

      }

      return totalToSend;

    }


Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow
Is this KB legal? Yes, learn why
Licensed under: CC-BY-SA with attribution
Not affiliated with Stack Overflow
Is this KB legal? Yes, learn why