以下内容大部分来自:
http://blog.csdn.net/tjvictor/article/details/4360030
部分内容出自互联网,实验结果为亲测。
最近自己开发一个向数据库中插入大量历史数据的函数库,需要解决一个大数据量插入的效率问题。不用分析,我知道如果采取逐条数据插入的方式,那么效率肯定很低,光是那么多循环就知道很慢了。于是乎,我找到了上篇博客,知道了BulkCopy和TVPs方式。为了更好的了解其效率,我自己动手亲测了一下效果,测试的数据库位于本机。
(1)方式1:循环插入
public static void NormalInerst(String connString) { Console.WriteLine("使用NNormalInerst方式:"); Stopwatch sw = new Stopwatch(); SqlConnection sqlConn = new SqlConnection(connString); SqlCommand sqlCmd = new SqlCommand(); sqlCmd.CommandText = String.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)"); sqlCmd.Parameters.Add("@p0", SqlDbType.Int); sqlCmd.Parameters.Add("@p1", SqlDbType.NVarChar); sqlCmd.Parameters.Add("@p2", SqlDbType.VarChar); sqlCmd.CommandType = CommandType.Text; sqlCmd.Connection = sqlConn; sqlConn.Open(); try { for (int i = 0, j = 0; i < 10; ++i ) { for (j = i * 10000; j < (i + 1) * 10000; ++j ) { sqlCmd.Parameters["@p0"].Value = j; sqlCmd.Parameters["@p1"].Value = String.Format("User-{0}", i * j); sqlCmd.Parameters["@p2"].Value = String.Format("Pwd-{0}", i * j); sw.Start(); sqlCmd.ExecuteNonQuery(); sw.Stop(); } Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (i + 1), dataScale, sw.ElapsedMilliseconds); sw.Reset(); } } catch (System.Exception ex) { throw ex; } finally { sqlConn.Close(); } }
该方式的效率极低,运行时间很长,我这里就不给出结果了,有兴趣可以自己粘贴试一下。PS:其中的数据规模应该是dataScale而不是10000,不过总是还是慢。
(2)方式2:使用BulkCopy
public static void BulkInerst(String connString) { Console.WriteLine("使用BulkInerst方式:"); Stopwatch sw = new Stopwatch(); String strDel = "delete from BulkTestTable"; float millTime = 0; for (int multiply = 0; multiply < 10; multiply++) { DataTable dt = GetTableSchema(); for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++) { DataRow r = dt.NewRow(); r[0] = count; r[1] = string.Format("User-{0}", count * multiply); r[2] = string.Format("Pwd-{0}", count * multiply); dt.Rows.Add(r); } SqlConnection sqlConn = new SqlConnection(connString); SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn); bulkCopy.DestinationTableName = "BulkTestTable"; bulkCopy.BatchSize = dt.Rows.Count; sw.Reset(); sw.Start(); try { sqlConn.Open(); if (dt != null && dt.Rows.Count != 0) bulkCopy.WriteToServer(dt); } catch (Exception ex) { throw ex; } finally { sqlConn.Close(); if (bulkCopy != null) bulkCopy.Close(); } sw.Stop(); Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds); millTime += sw.ElapsedMilliseconds; } Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10); SqlConnection sqlConn2 = new SqlConnection(connString); SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn2); try { sqlConn2.Open(); sqlCmd.ExecuteNonQuery(); } catch (Exception ex) { throw ex; } finally { sqlConn2.Close(); } Console.WriteLine("Done!"); }
(3)方式3:使用TVPs
public static void TVPsInerst(String connString) { Console.WriteLine("使用TVPsInerst方式:"); Stopwatch sw = new Stopwatch(); SqlConnection sqlConn = new SqlConnection(connString); String strSQL = "insert into BulkTestTable (Id,UserName,Pwd)" + " SELECT nc.Id, nc.UserName,nc.Pwd" + " FROM @NewBulkTestTvp AS nc"; String strDel = "delete from BulkTestTable"; float millTime = 0; for (int multiply = 0; multiply < 10; multiply++) { DataTable dt = GetTableSchema(); for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++) { DataRow r = dt.NewRow(); r[0] = count; r[1] = string.Format("User-{0}", count * multiply); r[2] = string.Format("Pwd-{0}", count * multiply); dt.Rows.Add(r); } sw.Reset(); sw.Start(); SqlCommand cmd = new SqlCommand(strSQL, sqlConn); SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt); catParam.SqlDbType = SqlDbType.Structured; catParam.TypeName = "dbo.BulkUDT"; try { sqlConn.Open(); if (dt != null && dt.Rows.Count != 0) { cmd.ExecuteNonQuery(); } } catch (Exception ex) { throw ex; } finally { sqlConn.Close(); } sw.Stop(); Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds); millTime += sw.ElapsedMilliseconds; } Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10); SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn); try { sqlConn.Open(); sqlCmd.ExecuteNonQuery(); } catch (Exception ex) { throw ex; } finally { sqlConn.Close(); } Console.WriteLine("Done!"); }
这里TVPs方式需要利用Visual Studio 2008采用的自定义数据表类型,这是一个比较新的东西。这里补充几个类型和函数,主要是为了检测数据库中是否存在数据表和数据表类型,如果不存在则进行创建。补充代码如下:
public enum CheckType { isTable = 0, isType } protected static int dataScale = 100000; public static bool CheckExistsObject(String connString, String objectName, CheckType type) { String strSQL = "select COUNT(1) from sys.sysobjects where name=‘" + objectName + "‘"; switch (type) { case CheckType.isTable: strSQL = "select COUNT(1) from sys.sysobjects where name=‘" + objectName + "‘"; break; case CheckType.isType: strSQL = "select COUNT(1) from sys.types where name=‘" + objectName + "‘"; break; default: break; } using (SqlConnection conn = new SqlConnection(connString)) { conn.Open(); SqlCommand cmd = new SqlCommand(strSQL, conn); int result = Convert.ToInt32(cmd.ExecuteScalar()); if (0 == result) { return false; } } return true; } public static bool CreateObject(String connString, String objectName, CheckType type) { String strSQL = ""; switch (type) { case CheckType.isTable: strSQL = "Create table " + objectName + " (Id int primary key, UserName nvarchar(32), Pwd varchar(16))"; break; case CheckType.isType: strSQL = "CREATE TYPE " + objectName + " AS TABLE (Id int, UserName nvarchar(32), Pwd varchar(16))"; break; default: break; } using (SqlConnection conn = new SqlConnection(connString)) { conn.Open(); SqlCommand cmd = new SqlCommand(strSQL, conn); cmd.ExecuteNonQuery(); } return true; } public static DataTable GetTableSchema() { DataTable dt = new DataTable(); dt.Columns.AddRange(new DataColumn[]{ new DataColumn("Id",typeof(int)), new DataColumn("UserName",typeof(string)), new DataColumn("Pwd",typeof(string))}); return dt; }
调用的方式就很好说了,参见如下测试代码:
public static void Main(string[] args) { String conString = "Persist Security Info=False;User ID=sa;Password=scbj123!@#;Initial Catalog=testGR;Server=KLH-PC"; String strType = "BulkUDT"; String strTable = "BulkTestTable"; if (!CheckExistsObject(conString, strType, CheckType.isType)) { Console.WriteLine("类型{0}不存在", strType); if (CreateObject(conString, strType, CheckType.isType)) { Console.WriteLine("类型{0}创建成功!", strType); } } if (!CheckExistsObject(conString, strTable, CheckType.isTable)) { Console.WriteLine("表格{0}不存在", strTable); if (CreateObject(conString, strTable, CheckType.isTable)) { Console.WriteLine("表格{0}创建成功!", strTable); } } Console.WriteLine("=================================================="); //NormalInerst(conString); BulkInerst(conString); TVPsInerst(conString); Console.ReadKey(); }
-------------------------------------------------------------------------------------------------
直接看效果对比:
<1>第一次运行
<2>第二次和第三次运行
这里考虑到了SQL Server自身缓存的原因,所以进行了多次测试,不过数据量没有变。可以从上述结果中看出:TVPs方式不愧是新出的啊,一代更比一代强!