写作背景
因工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo
;
技术点或Nuget元包:
.NET Core 3.0 Console
;Microsoft.Data.SqlClient -v1.0.19269.1
;- 开发工具
VS 2019 Pro x64 v16.3.3
; MS-SQLServer 2014 Enterprise
;
实现目标:
本地开发环境的正式数据同步到线上目标数据库(单表全字段同结构模式);
说明:这里使用本地环境同数据库类型的不同 DB
实例模拟实现,假如数据库【TestDB
】为源数据库,【TestDB2
】为目标数据库,都有共同的数据表对象【TestTB
】;
Demo 项目结构:
DHHelper 封装
基于 Microsoft.Data.SqlClient
简单的实现了所需的几个方法,如下所示:
MsSqlHelper.cs 类示例代码:
using System;
using System.Collections.Generic;
using System.Text;
using System.Data;
using Microsoft.Data.SqlTypes;
using Microsoft.Data.SqlClient;
using System.Reflection;
namespace DBHelper
{
public sealed class MsSqlHelper
{
#region 单利模式
/// <summary>
/// 1.构造函数私有化
/// </summary>
private MsSqlHelper() { }
/// <summary>
/// 2.创建私有化静态字段锁
/// </summary>
private static readonly object _ObjLock = new object();
/// <summary>
/// 3.创建私有化类对象,接收类的实例化对象
/// volatile 关键字促进线程安全,保障线程有序执行
/// </summary>
private static volatile MsSqlHelper _MsSqlHelper = null;
/// <summary>
/// 4.创建类实例化对象
/// </summary>
/// <returns></returns>
public static MsSqlHelper GetSingleObj()
{
if (_MsSqlHelper == null)
{
lock (_ObjLock) //保证只有一线程操作
{
if (_MsSqlHelper == null)
{
_MsSqlHelper = new MsSqlHelper();
}
}
}
return _MsSqlHelper;
}
#endregion
// 数据库链接字符串
private string _ConnString { get; set; }
/// <summary>
/// 初始化指定数据库桥接字符串
/// </summary>
/// <param name="sqlConnStrBuilder">数据库连接对象</param>
public void RegisterConn(Connection conn)
{
var builder = new SqlConnectionStringBuilder
{
DataSource = conn.DataSource, //your_server.database.windows.net
UserID = conn.UserID, //your_user
Password = conn.Password, //your_password
InitialCatalog = conn.InitialCatalog //your_database
};
_ConnString = builder.ConnectionString;
}
#region 单个数据(添加,更新,删除)
/// <summary>
/// 执行SQL语句,返回影响的记录数
/// Insert插入,Delete删除,Update更新
/// </summary>
/// <param name="cmdText">sql语句</param>
/// <param name="sqlParams">[可选]sql参数化</param>
/// <returns>int:受影响的行数</returns>
public int ExecNonQuery(string cmdText, params SqlParameter[] sqlParams)
{
int rowsCount = 0;
using (SqlConnection conn = new SqlConnection(_ConnString)) // 建立数据库连接对象
{
OpenConnection(conn);
using (SqlCommand cmd = conn.CreateCommand())
{
cmd.CommandType = CommandType.Text; //指定cmd命令类型为文本类型(默认,可不写);
cmd.CommandText = cmdText; //sql语句或存储过程
if (sqlParams != null && sqlParams.Length > 0) //检查参数组是否有数据
{
foreach (SqlParameter sqlParam in sqlParams)
{
//判断参数是否为null,是则转为数据库接受的DBnull
if (sqlParam.Value == null)
{
sqlParam.Value = DBNull.Value;
}
cmd.Parameters.Add(sqlParam); //参数格式化,防止sql注入
}
}
rowsCount = cmd.ExecuteNonQuery(); //执行非查询命令,接收受影响行数,大于0的话表示添加成功
cmd.Parameters.Clear();
}
CloseConnection(conn);
}
return rowsCount;
}
#endregion
#region 查询操作
/// <summary>
/// 返回数据库表DataTable
/// </summary>
/// <param name="cmdText">sql语句</param>
/// <param name="sqlParams">sql参数化</param>
/// <returns>DataTable</returns>
public DataTable GetDataTable(string cmdText, params SqlParameter[] sqlParams)
{
using (DataTable dt = new DataTable())
{
using (SqlConnection conn = new SqlConnection(_ConnString))
{
using (SqlCommand cmd = conn.CreateCommand())
{
cmd.CommandText = cmdText;
if (sqlParams != null && sqlParams.Length > 0)
{
foreach (SqlParameter parameter in sqlParams)
{
//判断参数是否为null,是则转为数据库接受的DBnull
if (parameter.Value == null)
{
parameter.Value = DBNull.Value;
}
//参数格式化,防止sql注入
cmd.Parameters.Add(parameter);
}
}
//适配器自动打开数据库连接
using (SqlDataAdapter da = new SqlDataAdapter(cmd))
{
da.Fill(dt);
}
cmd.Parameters.Clear();
}
}
return dt;
}
}
/// <summary>
/// 执行多sql语句,返回数据集
/// </summary>
/// <param name="sqlTuples">list:tabNames[可选],sql,SqlParameter[]</param>
/// <returns>DataSet</returns>
public DataSet GetDataSet(List<Tuple<string, string, SqlParameter[]>> sqlTuples)
{
using (DataSet ds = new DataSet())
{
string tabName = string.Empty; //tab名称
string cmdText = string.Empty; //sql语句
SqlParameter[] sqlParams = null;//sql参数化
if (sqlTuples != null && sqlTuples.Count > 0)
{
foreach (var tuple in sqlTuples)
{
tabName = tuple.Item1; //tab名称
cmdText = tuple.Item2; //sql语句
sqlParams = tuple.Item3;//sql格式化参数
using (SqlConnection conn = new SqlConnection(_ConnString))
{
using (SqlCommand cmd = conn.CreateCommand())
{
cmd.CommandText = cmdText;
//检查参数组是否有数据
if (sqlParams != null && sqlParams.Length > 0)
{
//cmd.Parameters.AddRange(sqlParams);
foreach (SqlParameter parameter in sqlParams)
{
//判断参数是否为null,是则转为数据库接受的DBnull
if (parameter.Value == null)
{
parameter.Value = DBNull.Value;
}
//参数格式化,防止sql注入
cmd.Parameters.Add(parameter);
}
}
//适配器自动打开数据库连接
using (SqlDataAdapter da = new SqlDataAdapter(cmd))
{
if (string.IsNullOrWhiteSpace(tabName))
da.Fill(ds);
else
da.Fill(ds, tabName); // 将tabName查询结果集合填入DataSet中,并且将DataTable命名为tabName
}
cmd.Parameters.Clear();
}
}
}
}
return ds;
}
}
#endregion
#region 批量添加数据
/// <summary>
/// 批量插入数据【INSERT INTO [TABLE] VALUES】,并返回受影响的行数
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="keyValuePair"></param>
/// <returns></returns>
public int InsertValuesToDB<T>(KeyValuePair<string,List<T>> keyValuePair)
{
string sql = $"INSERT INTO [{keyValuePair.Key}] @NAME VALUES @VALUES;";
var dicList = new List<Dictionary<string, string>>();
foreach (var item in keyValuePair.Value)
{
dicList.Add(GetProperties(item));
}
var name = new StringBuilder();
var values = new StringBuilder();
for (int i = 0; i < dicList.Count; i++)
{
var dic = dicList[i];
int columns = 0;
var tmpValue = new StringBuilder();
foreach (var item in dic)
{
if (i == 0)
{
if (columns < dic.Count -1)
{
name.Append($"[{item.Key}],");
tmpValue.Append($"'{item.Value}',");
}
else
{
name.Append($"[{item.Key}]");
tmpValue.Append($"'{item.Value}'");
}
}
else if(i >0 && i < dic.Count - 1)
{
if (columns < dic.Count - 1)
{
tmpValue.Append($"'{item.Value}',");
}
else
{
tmpValue.Append($"'{item.Value}'");
}
}
else
{
if (columns < dic.Count - 1)
{
tmpValue.Append($"'{item.Value}',");
}
else
{
tmpValue.Append($"'{item.Value}'");
}
}
columns++;
}
if (i == 0)
{
sql = sql.Replace("@NAME", $"({name.ToString()})");
}
if (i < dicList.Count - 1)
{
values.AppendLine($"({tmpValue.ToString()}),");
}
else
{
values.Append($"({tmpValue.ToString()})");
}
}
sql = sql.Replace("@VALUES", values.ToString());
return ExecNonQuery(sql);
}
/// <summary>
/// 反射得到实体类(泛型T)的字段名称和值
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="t"></param>
/// <returns></returns>
private Dictionary<string, string> GetProperties<T>(T t)
{
var ret = new Dictionary<string, string>();
if (t == null)
return null;
PropertyInfo[] properties = t.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
if (properties.Length <= 0)
return null;
foreach (PropertyInfo item in properties)
{
string name = item.Name; //实体类字段名称
string value = Convert.ToString(item.GetValue(t, null)); //该字段的值
//Type type = item.PropertyType; //获取值value的类型
if (item.PropertyType.IsValueType || item.PropertyType.Name.StartsWith("String"))
{
ret.Add(name, value); //在此可转换value的类型
}
}
return ret;
}
/// <summary>
/// 1.批量插入数据【Bulk】,并返回受影响的行数
/// </summary>
/// <param name="dt">DataTable</param>
/// <param name="dtName">表名称</param>
/// <returns>int:受影响的行数</returns>
public int InsertBulkToDB(DataTable dt, string dtName)
{
string tableName = dtName ?? dt.TableName;
int rowsCount = 0;
using (SqlConnection conn = new SqlConnection(_ConnString))
{
OpenConnection(conn);
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn))
{
if (dt != null && dt.Rows.Count != 0)
{
bulkCopy.DestinationTableName = tableName; //数据表名称
bulkCopy.BatchSize = dt.Rows.Count;
bulkCopy.WriteToServer(dt);
rowsCount = bulkCopy.BatchSize;
}
CloseConnection(conn);
}
}
return rowsCount;
}
/// <summary>
/// 2.批量插入数据【Bulk】(加入内部事务),并返回受影响的行数
/// </summary>
/// <param name="dt">DataTable</param>
/// <param name="dtName">表名称</param>
/// <returns>int:受影响的行数</returns>
public int InsertBulkToDBByTransaction(DataTable dt, string dtName)
{
SqlTransaction transaction = null; // 创建事务对象
try
{
string tableName = dtName ?? dt.TableName;
int rowsCount = 0;
using (SqlConnection conn = new SqlConnection(_ConnString))
{
OpenConnection(conn);
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(_ConnString, SqlBulkCopyOptions.UseInternalTransaction))
{
if (dt != null && dt.Rows.Count != 0)
{
bulkCopy.DestinationTableName = tableName; //数据表名称
bulkCopy.BatchSize = dt.Rows.Count;
bulkCopy.WriteToServer(dt);
rowsCount = bulkCopy.BatchSize;
}
CloseConnection(conn);
}
}
return rowsCount;
}
catch (Exception ex)
{
transaction.Rollback(); //事务回滚
throw new Exception(ex.Message, ex);
}
}
#endregion
#region 开启连接SqlConnection.Open
/// <summary>
/// 打开OracleConnection
/// </summary>
/// <param name="conn">数据库连接对象</param>
private static void OpenConnection(SqlConnection conn)
{
try
{
if (conn.State == ConnectionState.Closed) conn.Open();
}
catch (Exception ex)
{
throw new Exception(ex.Message, null);
}
}
#endregion
#region 关闭连接,释放资源
/// <summary>
/// 关闭Connection
/// </summary>
/// <param name="conn">数据库(Oracle)连接对象</param>
private static void CloseConnection(SqlConnection conn)
{
if (conn.State == ConnectionState.Open)
{
conn.Close();
conn.Dispose();//释放资源
}
}
/// <summary>
/// 关闭DataReader
/// </summary>
/// <param name="dataReader">数据读取器对象</param>
private static void CloseDataReader(SqlDataReader dataReader)
{
if (dataReader.IsClosed == false) dataReader.Close();
}
#endregion
}
}
Connection.cs 类示例代码:
using System;
using System.Collections.Generic;
using System.Text;
namespace DBHelper
{
/// <summary>
/// 数据库桥接对象模型
/// </summary>
public class Connection
{
/// <summary>
/// 数据库IP
/// </summary>
public string DataSource { get; set; }
/// <summary>
/// 数据库授权账户
/// </summary>
public string UserID { get; set; }
/// <summary>
/// 数据库访问密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 数据库名称
/// </summary>
public string InitialCatalog { get; set; }
}
}
DataSyncHelper 控制台调用测试:
using DBHelper;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Data;
namespace DataSyncHelper
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
//源数据库(本地)连接
var conn = new Connection
{
DataSource = "192.168.10.228",
UserID = "sa",
Password = "123456",
InitialCatalog = "TestDB"
};
//目标数据库连接(多个目标数据库添加List对象即可)
var connList = new List<Connection>
{
new Connection
{
DataSource = "192.168.10.228",
UserID = "sa",
Password = "123456",
InitialCatalog = "TestDB2"
}
};
Test1(conn);
Test2(conn, connList);
}
/// <summary>
/// 模拟数据库批量插入数据【INSERT INTO [TABLE] VALUES】
/// </summary>
/// <param name="conn">源数据库桥接对象</param>
static void Test1(Connection conn)
{
var ml = new List<TestTb>(); //模拟10条数据
for (int i = 0; i < 10; i++)
{
ml.Add(new TestTb
{
Id = Guid.NewGuid().ToString("N"),
Name = $"张三{i}",
No = i,
Birthday = DateTime.Now,
Remark = $"张三{i}"
});
}
var keyValuePair = new KeyValuePair<string, List<TestTb>>("TestTb", ml);
MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
MsSqlHelper.GetSingleObj().InsertValuesToDB(keyValuePair);
}
/// <summary>
/// 同步数据(一主多从)
/// </summary>
/// <param name="conn">源数据库桥接对象</param>
/// <param name="connList">目标数据库桥接对象</param>
static void Test2(Connection conn, List<Connection> connList)
{
#region 源数据库(本地)连接
MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
string sql = "SELECT * FROM [TestTb];";
var dt = MsSqlHelper.GetSingleObj().GetDataTable(sql);
dt.TableName = "TestTb";
#endregion
#region 目标数据库连接(Demo演示中暂时只有一个)
var dtDic = new Dictionary<Connection, DataTable>(); //原始数据集(保留原始表数据)
foreach (var item in connList)
{
MsSqlHelper.GetSingleObj().RegisterConn(item); //注册目标conn桥接对象
var oldDt = MsSqlHelper.GetSingleObj().GetDataTable(sql); //查询历史数据
dtDic.Add(item, oldDt);
string delSql = "DELETE FROM [TestTb];"; //全表删除历史数据
int rcount = MsSqlHelper.GetSingleObj().ExecNonQuery(delSql);
int newRCount = MsSqlHelper.GetSingleObj().InsertBulkToDB(dt, dt.TableName);
}
#endregion
}
}
/// <summary>
/// 原始表模型
/// </summary>
public class TestTb
{
public string Id { get; set; }
public string Name { get; set; }
public int? No { get; set; }
public DateTime? Birthday { get; set; }
public string Remark { get; set; }
}
}
测试结果
首先给源数据库【TestDB
】模拟 10
条数据 =》 Test1
方法,执行结果如下:
同步目标数据库【TestDB2
】中的【TestTB
】对象 =》 Test2
方法,执行结果如下:
以上 Demo
演示完毕,在生产环境中可根据自己的实际情况修改调整。