SQL Server大量数据秒级插入/新增/删除

本文涉及的产品
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
简介: 原文:SQL Server大量数据秒级插入/新增/删除 1.快速保存,该方法有四个参数,第一个参数为数据库连接,第二个参数为需要保存的DataTable,该参数的TableName属性需要设置为数据库中目标数据表的表名,第三个参数为输出参数,如果保存过程中发生错误则错误信息会输出在这个参数里面,第四个参数为可选参数,是否保持连接为打开状态。
原文: SQL Server大量数据秒级插入/新增/删除

1.快速保存,该方法有四个参数,第一个参数为数据库连接,第二个参数为需要保存的DataTable,该参数的TableName属性需要设置为数据库中目标数据表的表名,第三个参数为输出参数,如果保存过程中发生错误则错误信息会输出在这个参数里面,第四个参数为可选参数,是否保持连接为打开状态。

        /// <summary>
        /// 快速保存数据,自动识别insert和update
        /// </summary>
        /// <param name="_sourceTable">需要保存的源数据表</param>
        /// <param name="_sqlCon">数据库连接</param>
        /// <param name="_errorMsg">输出参数,错误信息</param>
        /// <param name="KeepConnectionAlive">是否保持连接,可选参数,默认否</param>
        /// <returns></returns>
        private bool BulkSave(DataTable _sourceTable, SqlConnection _sqlCon,out string _errorMsg, bool _keepConnectionAlive = false)
        {
            bool result = true;
           _errorMsg = string.Empty;
            DataTable sourceTable = _sourceTable.Copy();
            if (string.IsNullOrEmpty(sourceTable.TableName))
            {
                _errorMsg = "数据源表的TableName属性不能为空!";
                return false;
            }
            List<string> colList = new List<string>();
            foreach (DataColumn col in sourceTable.Columns)
            {
                colList.Add(col.ColumnName);
            }
            int updateNum, insertNum;
            updateNum = insertNum = 0;
            try
            {
                #region
                if (_sqlCon.State == ConnectionState.Closed)
                {
                    _sqlCon.Open();
                }
                SqlCommand cmd = _sqlCon.CreateCommand();
                StringBuilder sb = new StringBuilder();
                DataTable pk = new DataTable();
                string tempTableName = "#" + sourceTable.TableName;//#表名 为当前连接有效的临时表 ##表名 为全局有效的临时表
                string tempTableFullCloumn = "";//临时表获取表结构命令字符串
                string updateSetStr = "";//update set 命令字符串
                string insertWhereStr = "";//insert 命令用来排除已经存在记录的 not exist 命令中where条件字符串
                string insertColumnStr = "";//列名字符串
                string tempColmunstr = "";//t.+列名 字符串

                sb = new StringBuilder();
                sb.AppendFormat(@"select a.name as Name,b.name as 'type',a.length as 'length' ,a.collation as 'collation' from syscolumns a
                                  left join systypes b 
                                  on a.xtype = b.xtype 
                                    where colid in 
                                        (select colid from sysindexkeys 
                                            where id = object_id('{0}') 
                                            and indid = 
                                                (select indid from sysindexes 
                                                    where name = (select name from sysobjects 
                                                        where xtype='PK' 
                                                        and parent_obj = object_id('{0}')
                                                                  )
                                                 )
                                         ) and a.id = object_id('{0}');", sourceTable.TableName);
                cmd.CommandText = sb.ToString();
                pk.Load(cmd.ExecuteReader());//查询主键列表
                #endregion

                #region
                /* 利用传递进来的DataTable列名列表,从数据库的源表获取
                     * 临时表的表结构*/
                for (int i = 0; i < colList.Count; i++)
                {

                    /* 如果当前列是主键,set命令字符串跳过不作处理,
                     * 临时表获取表结构命令字符串不论何种情况都不跳过 */

                    if (pk.Select("Name= '" + (colList[i]) + "'").Length > 0)
                    {
                        string sql = string.Format("SELECT COLUMNPROPERTY(OBJECT_ID('{0}'), '{1}', 'IsIdentity')", sourceTable.TableName, colList[i]);
                        cmd.CommandText = sql;
                       bool  flag = Convert.ToBoolean(cmd.ExecuteScalar());
                        if (!flag)
                        {
                            if (updateSetStr.Length > 0)
                            {
                                updateSetStr += ",";
                            }
                            if (insertColumnStr.Length > 0)
                            {
                                insertColumnStr += ",";
                            }
                            if (tempColmunstr.Length > 0)
                            {
                                tempColmunstr += ",";
                            }
                            updateSetStr += colList[i] + "= t." + colList[i];
                            insertColumnStr += colList[i];
                            tempColmunstr += colList[i];
                        }
                    }
                    else
                    {
                        if (updateSetStr.Length > 0)
                        {
                            updateSetStr += ",";
                        }
                        if (insertColumnStr.Length > 0)
                        {
                            insertColumnStr += ",";
                        }
                        if (tempColmunstr.Length > 0)
                        {
                            tempColmunstr += ",";
                        }
                        updateSetStr += colList[i] + "= t." + colList[i];
                        insertColumnStr += colList[i];
                        tempColmunstr += colList[i];
                    }

                    if (i > 0)
                    {
                        tempTableFullCloumn += ",";
                    }


                    tempTableFullCloumn += "s." + colList[i];
                }
                #endregion

                #region
                sb = new StringBuilder();
                sb.AppendFormat("select top 0 {0} into {1} from {2} s;", tempTableFullCloumn, tempTableName, sourceTable.TableName);
                cmd.CommandText = sb.ToString();
                cmd.ExecuteNonQuery();//创建临时表

                /* 根据获得的目标表主键,来为SQL Server 系统中的临时表增加相应的非主键但是数据相等
                 * 的 影射列,因为有些系统的主键为自增类型,在调用bulk.WriteToServer方法的时候,自增主键会
                 * 在临时表中从0开始计算,没办法用临时表的主键和目标表的主键做 where 条件,故用影射列代替*/
                for (int i = 0; i < pk.Rows.Count; i++)
                {
                    if (i > 0)
                    {
                        insertWhereStr += " and ";
                    }
                    string newColName = pk.Rows[i]["name"].ToString() + "New";
                    sb = new StringBuilder();
                    switch (pk.Rows[i]["type"].ToString())
                    {
                        case "char":
                        case "varchar":
                        case "nchar":
                        case "nvarchar":
                            sb.AppendFormat("alter table {0} add {1} {2}({3}) ", tempTableName, newColName, pk.Rows[i]["Type"].ToString(), pk.Rows[i]["length"]);
                            break;
                        default:
                            sb.AppendFormat("alter table {0} add {1} {2} ", tempTableName, newColName, pk.Rows[i]["Type"].ToString());
                            break;
                    }
                    if (!(pk.Rows[i]["collation"] is DBNull))
                    {
                        sb.AppendFormat("COLLATE {0}", pk.Rows[i]["collation"]);
                    }
                    cmd.CommandText = sb.ToString();
                    cmd.ExecuteNonQuery();


                    sourceTable.Columns.Add(new DataColumn(newColName, sourceTable.Columns[pk.Rows[i]["name"].ToString()].DataType));
                    foreach (DataRow dr in sourceTable.Rows)
                    {
                        dr[newColName] = dr[pk.Rows[i]["name"].ToString()].ToString().Trim();
                    }
                    insertWhereStr += "t." + newColName + "=s." + pk.Rows[i]["name"];
                }

                using (System.Data.SqlClient.SqlBulkCopy bulk = new System.Data.SqlClient.SqlBulkCopy(_sqlCon))
                {
                    //string SQl = "select * from #bulktable ";
                    //DataTable tempx = new DataTable();
                    //cmd.CommandText = SQl;
                    //tempx.Load(cmd.ExecuteReader());
                    //_souceTable.Rows[0]["unit_name"] = string.Empty;
                    //_souceTable.Rows[1]["unit_name"] = string.Empty;
                    int colCount = sourceTable.Columns.Count;
                    foreach (DataRow row in sourceTable.Rows)
                    {
                        for (int i = 0; i < colCount; i++)
                        {
                            row[i] = row[i].ToString().Trim();
                        }
                    }
                    bulk.DestinationTableName = tempTableName;
                    bulk.BulkCopyTimeout = 36000;
                    try
                    {
                        bulk.WriteToServer(sourceTable);//将数据写入临时表
                        //string sql = "select * from #bulktable";
                        //SqlDataAdapter sda = new SqlDataAdapter(sql, _sqlCon);
                        //DataTable dt = new DataTable();
                        //sda.Fill(dt);
                    }
                    catch (Exception e)
                    {
                       _errorMsg = e.Message;
                        result = false;
                        //MessageBox.Show(e.Message);
                        //return e.Message.Trim();
                    }
                }
                #endregion

                #region
                if (insertWhereStr.Equals(""))//如果不存在主键
                {
                    sb = new StringBuilder();
                    sb.AppendFormat("insert into {0} select {1} from {2} s;", sourceTable.TableName, tempTableFullCloumn, tempTableName);
                    cmd.CommandText = sb.ToString();
                    insertNum = cmd.ExecuteNonQuery();//插入临时表数据到目的表
                   //_errorMsg = "1";
                }
                else
                {
                    sb = new StringBuilder();

                    sb.AppendFormat("update {0} set {1} from( {2} t INNER JOIN {0} s on {3} );",
                                    sourceTable.TableName, updateSetStr, tempTableName, insertWhereStr);
                    //cmd.CommandText = sb.ToString();
                    //Stopwatch sw = new Stopwatch();
                    //sw.Start();

                    //updateNum = cmd.ExecuteNonQuery();//更新已存在主键数据
                   //_errorMsg += "更新" + updateNum + "条记录";
                    //sw.Stop();
                    //sb = new StringBuilder();
                    sb.AppendFormat("insert into {0}({4}) select {1} from {2} t where not EXISTS(select 1 from {0} s where {3});",
                                     sourceTable.TableName, tempColmunstr, tempTableName, insertWhereStr, insertColumnStr);
                    cmd.CommandText = sb.ToString();
                    //insertNum = cmd.ExecuteNonQuery();//插入新数据
                    //_errorMsg += "插入" + insertNum + "条记录";
                    //MessageBox.Show("共用时" + sw.Elapsed + "\n 共新增:" + insertNum + "条记录,更新:" + updateNum + "条记录!");
                    //return_str = "1";
                    var st = _sqlCon.BeginTransaction();
                    cmd.Transaction = st;
                    try
                    {
                        cmd.ExecuteNonQuery();
                        st.Commit();
                    }
                    catch (Exception ee)
                    {
                        _errorMsg += ee.Message;
                        result = false;
                        st.Rollback();
                    }

                }
                #endregion
            }
            catch (Exception e)
            {
               _errorMsg = e.Message.Trim();
                result = false;
            }
            finally
            {
                if (!_keepConnectionAlive && _sqlCon.State == ConnectionState.Open)
                {
                    _sqlCon.Close();
                }
            }
            return result;
        }

2.快速删除,该方法有四个参数,第一个参数为数据库连接,第二个参数为需要删除的DataTable,该参数的TableName属性需要设置为数据库中目标数据表的表名,第三个参数为输出参数,如果删除过程中发生错误则错误信息会输出在这个参数里面,第四个参数为可选参数,是否保持连接为打开状态。

        /// <summary>
        /// 快速删除
        /// </summary>
        /// <param name="_sourceTable">需要删除的源数据表</param>
        /// <param name="_sqlCon">数据库连接</param>
        /// <param name="_errorMsg">输出参数,错误信息</param>
        /// <param name="_keepConnectionAlive">是否保持连接,可选参数,默认否</param>
        /// <returns></returns>
        private bool BulkDelete(DataTable _sourceTable, SqlConnection _sqlCon, out string _errorMsg, bool _keepConnectionAlive = false)
        {
            bool result = true;
            _errorMsg = string.Empty;
            DataTable sourceTable = _sourceTable.Copy();

            string SQl = "";
            DataTable pkTable = new DataTable();
            DataSet ds = new DataSet();
            string whereStr = string.Empty;
            string colList = string.Empty;
            if (string.IsNullOrEmpty(sourceTable.TableName))
            {
                _errorMsg += "数据源表的TableName属性不能为空!";
                return false;
            }
            try
            {
                #region 检查数据表是否存在
                SqlCommand sqlComm = _sqlCon.CreateCommand();
                SqlDataAdapter sda = new SqlDataAdapter();
                string tempTableName = "#" + sourceTable.TableName;
                SQl = string.Format("select COUNT(*) from sysobjects where id = object_id(N'[{0}]') and OBJECTPROPERTY(id, N'IsUserTable') = 1", sourceTable.TableName);
                sqlComm.CommandText = SQl;
                if (_sqlCon.State != ConnectionState.Open)
                {
                    _sqlCon.Open();
                }
                int count = Convert.ToInt32(sqlComm.ExecuteScalar());
                #endregion

                if (count == 0)
                {
                    _errorMsg += string.Format("在数据库中,找不到名为{0}的数据表!", sourceTable.TableName);
                }
                else
                {
                    #region 获取主键信息
                    SQl = string.Format(@"select a.name as Name,b.name as 'type',a.length as 'length' ,a.collation as 'collation' from syscolumns a left join systypes b on a.xtype = b.xtype where colid in (select colid from sysindexkeys where id = object_id('{0}') and indid = (select indid from sysindexes where name = (select name from sysobjects where xtype='PK' and parent_obj = object_id('{0}')))) and a.id = object_id('{0}');", sourceTable.TableName);
                    sqlComm.CommandText = SQl;
                    sda.SelectCommand = sqlComm;
                    sda.Fill(ds, "pkTable");
                    pkTable = ds.Tables["pkTable"];
                    #endregion

                    #region 生成where条件
                    foreach (DataColumn col in sourceTable.Columns)
                    {
                        colList += colList.Length == 0 ? col.ColumnName : "," + col.ColumnName;
                    }

                    SQl = string.Format("select top 0 {0} into {1} from {2}", colList, tempTableName, sourceTable.TableName);
                    sqlComm.CommandText = SQl;
                    sqlComm.ExecuteNonQuery();
                    if (pkTable.Rows.Count <= 0)
                    {
                        _errorMsg += string.Format("获取{0}表主键信息失败,请重试或者检查数据库!", sourceTable.TableName);

                    }
                    else
                    {
                        foreach (DataRow dr in pkTable.Rows)
                        {
                            string newColName = dr["name"].ToString() + "New";
                            /* 如果当前列是主键,set命令字符串跳过不作处理,
                             * 临时表获取表结构命令字符串不论何种情况都不跳过 */
                            SQl = string.Format("SELECT COLUMNPROPERTY(OBJECT_ID('{0}'), '{1}', 'IsIdentity')", sourceTable.TableName, dr["name"]);
                            sqlComm.CommandText = SQl;
                           bool  flag = Convert.ToBoolean(sqlComm.ExecuteScalar());
                            switch (dr["type"].ToString())
                            {
                                case "char":
                                case "varchar":
                                case "nchar":
                                case "nvarchar":
                                    SQl = string.Format("alter table {0} add {1} {2}({3}) ", tempTableName, newColName, dr["Type"].ToString(), dr["length"]);
                                    break;
                                default:
                                    SQl = string.Format("alter table {0} add {1} {2} ", tempTableName, newColName, dr["Type"].ToString());
                                    break;
                            }
                            if (!(dr["collation"] is DBNull))
                            {
                                SQl = string.Format("{0} COLLATE {1}", SQl, dr["collation"]);
                            }
                            sqlComm.CommandText = SQl;
                            sqlComm.ExecuteNonQuery();

                            whereStr += string.IsNullOrEmpty(whereStr) ? string.Format("{0}.{2} in( select {1}.[{3}] from {1} )", sourceTable.TableName, tempTableName, dr["name"], newColName) : string.Format(" and {0}.{2} in( select {1}.[{3}] from {1} )", sourceTable.TableName, tempTableName, dr["name"], newColName);
                            sourceTable.Columns.Add(new DataColumn(newColName, sourceTable.Columns[dr["name"].ToString()].DataType));
                            foreach (DataRow row in sourceTable.Rows)
                            {
                                row[newColName] = row[dr["name"].ToString()].ToString().Trim();
                            }
                        }
                    }
                }
                #endregion

                #region 将数据放进临时表

                SqlBulkCopy bulk = new SqlBulkCopy(_sqlCon);
                bulk.DestinationTableName = tempTableName;
                bulk.BulkCopyTimeout = 3600;
                try
                {
                    bulk.WriteToServer(sourceTable);
                }
                catch (Exception ee)
                {
                    _errorMsg += ee.Message;
                    bulk.Close();
                }
                #endregion

                #region 开始删除
                //SQl = string.Format("select * from {0}", tempTableName);
                //sqlComm.CommandText = SQl;
                //sda.SelectCommand = sqlComm;
                //sda.Fill(ds, tempTableName);
                SQl = string.Format(@" DELETE FROM {0}  WHERE {1}", sourceTable.TableName, whereStr);
                sqlComm.CommandText = SQl;
                var tx = _sqlCon.BeginTransaction();
                try
                {
                    sqlComm.Transaction = tx;
                    count = sqlComm.ExecuteNonQuery();
                    tx.Commit();
                    _errorMsg += string.Format("应该删除{0}条记录\r\n共删除{1}条记录!", sourceTable.Rows.Count, count);
                }
                catch (Exception ee)
                {
                    _errorMsg += ee.Message;
                    tx.Rollback();
                }
                #endregion
            }
            catch (Exception e)
            {
                _errorMsg += e.Message;
            }
            finally
            {
                if (_sqlCon.State == ConnectionState.Open && !_keepConnectionAlive)
                {
                    _sqlCon.Close();
                }
            }
            return result;
        }
相关实践学习
使用SQL语句管理索引
本次实验主要介绍如何在RDS-SQLServer数据库中,使用SQL语句管理索引。
SQL Server on Linux入门教程
SQL Server数据库一直只提供Windows下的版本。2016年微软宣布推出可运行在Linux系统下的SQL Server数据库,该版本目前还是早期预览版本。本课程主要介绍SQLServer On Linux的基本知识。 相关的阿里云产品:云数据库RDS&nbsp;SQL Server版 RDS SQL Server不仅拥有高可用架构和任意时间点的数据恢复功能,强力支撑各种企业应用,同时也包含了微软的License费用,减少额外支出。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/sqlserver
目录
相关文章
|
1月前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
2月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
2月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
2月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
2月前
|
SQL 监控 数据处理
SQL数据库数据修改操作详解
数据库是现代信息系统的重要组成部分,其中SQL(StructuredQueryLanguage)是管理和处理数据库的重要工具之一。在日常的业务运营过程中,数据的准确性和及时性对企业来说至关重要,这就需要掌握如何在数据库中正确地进行数据修改操作。本文将详细介绍在SQL数据库中如何修改数据,帮助读者更好
334 4
|
2月前
|
SQL 关系型数据库 MySQL
SQL批量插入测试数据的几种方法?
SQL批量插入测试数据的几种方法?
114 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
96 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
43 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
56 0
|
2月前
|
SQL
使用SQL进行集合查询和数据维护
使用SQL进行集合查询和数据维护
41 0