.NET Core 3.0 控制台实现数据同步

简介: 写作背景:应工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;技术点或 Nuget 元包:.NET Core 3.0Console;Microsoft.Data.SqlClient -v 1.0.19269.1;开发工具 VS 2019 Pro x64 v16.3.3;MS-SQLServer 2014 Enterprise ...

写作背景

因工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;

技术点或Nuget元包:

  • .NET Core 3.0 Console;
  • Microsoft.Data.SqlClient -v1.0.19269.1;
  • 开发工具 VS 2019 Pro x64 v16.3.3;
  • MS-SQLServer 2014 Enterprise;

实现目标:

本地开发环境的正式数据同步到线上目标数据库(单表全字段同结构模式);

说明:这里使用本地环境同数据库类型的不同 DB 实例模拟实现,假如数据库【TestDB】为源数据库,【TestDB2】为目标数据库,都有共同的数据表对象【TestTB】;

Demo 项目结构:

image.png

DHHelper 封装

基于 Microsoft.Data.SqlClient 简单的实现了所需的几个方法,如下所示:

MsSqlHelper.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Data;
using Microsoft.Data.SqlTypes;
using Microsoft.Data.SqlClient;
using System.Reflection;
 
namespace DBHelper
{
    public sealed class MsSqlHelper
    {
        #region 单利模式
        /// <summary>
        /// 1.构造函数私有化
        /// </summary>
        private MsSqlHelper() { }
 
        /// <summary>
        /// 2.创建私有化静态字段锁
        /// </summary>
        private static readonly object _ObjLock = new object();
 
        /// <summary>
        /// 3.创建私有化类对象,接收类的实例化对象
        /// volatile 关键字促进线程安全,保障线程有序执行
        /// </summary>
        private static volatile MsSqlHelper _MsSqlHelper = null;
 
        /// <summary>
        /// 4.创建类实例化对象
        /// </summary>
        /// <returns></returns>
        public static MsSqlHelper GetSingleObj()
        {
            if (_MsSqlHelper == null)
            {
                lock (_ObjLock) //保证只有一线程操作
                {
                    if (_MsSqlHelper == null)
                    {
                        _MsSqlHelper = new MsSqlHelper();
                    }
                }
            }
            return _MsSqlHelper;
        }
        #endregion
 
        // 数据库链接字符串
        private string _ConnString { get; set; }
 
        /// <summary>
        /// 初始化指定数据库桥接字符串
        /// </summary>
        /// <param name="sqlConnStrBuilder">数据库连接对象</param>
        public void RegisterConn(Connection conn)
        {
            var builder = new SqlConnectionStringBuilder
            {
                DataSource = conn.DataSource, //your_server.database.windows.net
                UserID = conn.UserID, //your_user  
                Password = conn.Password, //your_password  
                InitialCatalog = conn.InitialCatalog //your_database  
            };
            _ConnString = builder.ConnectionString;
        }
 
        #region 单个数据(添加,更新,删除)
        /// <summary>  
        /// 执行SQL语句,返回影响的记录数  
        /// Insert插入,Delete删除,Update更新  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">[可选]sql参数化</param>  
        /// <returns>int:受影响的行数</returns>  
        public int ExecNonQuery(string cmdText, params SqlParameter[] sqlParams)
        {
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))  // 建立数据库连接对象  
            {
                OpenConnection(conn);
                using (SqlCommand cmd = conn.CreateCommand())
                {
                    cmd.CommandType = CommandType.Text; //指定cmd命令类型为文本类型(默认,可不写);  
                    cmd.CommandText = cmdText; //sql语句或存储过程                         
                    if (sqlParams != null && sqlParams.Length > 0) //检查参数组是否有数据  
                    {
                        foreach (SqlParameter sqlParam in sqlParams)
                        {
                            //判断参数是否为null,是则转为数据库接受的DBnull
                            if (sqlParam.Value == null)
                            {
                                sqlParam.Value = DBNull.Value;
                            }
                            cmd.Parameters.Add(sqlParam); //参数格式化,防止sql注入  
                        }
                    }
                    rowsCount = cmd.ExecuteNonQuery(); //执行非查询命令,接收受影响行数,大于0的话表示添加成功  
                    cmd.Parameters.Clear();
                }
                CloseConnection(conn);
            }
            return rowsCount;
        }
        #endregion
 
        #region 查询操作
        /// <summary>  
        /// 返回数据库表DataTable  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">sql参数化</param>  
        /// <returns>DataTable</returns>  
        public DataTable GetDataTable(string cmdText, params SqlParameter[] sqlParams)
        {
            using (DataTable dt = new DataTable())
            {
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    using (SqlCommand cmd = conn.CreateCommand())
                    {
                        cmd.CommandText = cmdText;
                        if (sqlParams != null && sqlParams.Length > 0)
                        {
                            foreach (SqlParameter parameter in sqlParams)
                            {
                                //判断参数是否为null,是则转为数据库接受的DBnull
                                if (parameter.Value == null)
                                {
                                    parameter.Value = DBNull.Value;
                                }
                                //参数格式化,防止sql注入  
                                cmd.Parameters.Add(parameter);
                            }
                        }
                        //适配器自动打开数据库连接  
                        using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                        {
                            da.Fill(dt);
                        }
                        cmd.Parameters.Clear();
                    }
                }
                return dt;
            }
        }
 
        /// <summary>
        /// 执行多sql语句,返回数据集
        /// </summary>
        /// <param name="sqlTuples">list:tabNames[可选],sql,SqlParameter[]</param>
        /// <returns>DataSet</returns>
        public DataSet GetDataSet(List<Tuple<string, string, SqlParameter[]>> sqlTuples)
        {
            using (DataSet ds = new DataSet())
            {
                string tabName = string.Empty; //tab名称
                string cmdText = string.Empty; //sql语句   
                SqlParameter[] sqlParams = null;//sql参数化  
 
                if (sqlTuples != null && sqlTuples.Count > 0)
                {
                    foreach (var tuple in sqlTuples)
                    {
                        tabName = tuple.Item1; //tab名称
                        cmdText = tuple.Item2; //sql语句   
                        sqlParams = tuple.Item3;//sql格式化参数
                        using (SqlConnection conn = new SqlConnection(_ConnString))
                        {
                            using (SqlCommand cmd = conn.CreateCommand())
                            {
                                cmd.CommandText = cmdText;
                                //检查参数组是否有数据  
                                if (sqlParams != null && sqlParams.Length > 0)
                                {
                                    //cmd.Parameters.AddRange(sqlParams);
                                    foreach (SqlParameter parameter in sqlParams)
                                    {
                                        //判断参数是否为null,是则转为数据库接受的DBnull
                                        if (parameter.Value == null)
                                        {
                                            parameter.Value = DBNull.Value;
                                        }
                                        //参数格式化,防止sql注入  
                                        cmd.Parameters.Add(parameter);
                                    }
                                }
 
                                //适配器自动打开数据库连接  
                                using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                                {
                                    if (string.IsNullOrWhiteSpace(tabName))
                                        da.Fill(ds);
                                    else
                                        da.Fill(ds, tabName); // 将tabName查询结果集合填入DataSet中,并且将DataTable命名为tabName  
                                }
                                cmd.Parameters.Clear();
                            }
                        }
                    }
                }
                return ds;
            }
        }
        #endregion
 
        #region 批量添加数据
        /// <summary>
        /// 批量插入数据【INSERT INTO [TABLE] VALUES】,并返回受影响的行数  
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="keyValuePair"></param>
        /// <returns></returns>
        public int InsertValuesToDB<T>(KeyValuePair<string,List<T>> keyValuePair) 
        {
            string sql = $"INSERT INTO [{keyValuePair.Key}] @NAME VALUES @VALUES;";
            var dicList = new List<Dictionary<string, string>>();
            foreach (var item in keyValuePair.Value)
            {
                dicList.Add(GetProperties(item));
            }
 
            var name = new StringBuilder();
            var values = new StringBuilder();
            for (int i = 0; i < dicList.Count; i++)
            {
                var dic = dicList[i];
                int columns = 0;
                var tmpValue = new StringBuilder();
                foreach (var item in dic)
                {
                    if (i == 0)
                    {
                        if (columns < dic.Count -1)
                        {
                            name.Append($"[{item.Key}],");
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            name.Append($"[{item.Key}]");
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else if(i >0 && i < dic.Count - 1)
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    columns++;
                }
                if (i == 0)
                {
                    sql = sql.Replace("@NAME", $"({name.ToString()})");
                }
                if (i < dicList.Count - 1)
                {
                    values.AppendLine($"({tmpValue.ToString()}),");
                }
                else
                {
                    values.Append($"({tmpValue.ToString()})");
                }
            }
            sql = sql.Replace("@VALUES", values.ToString());
            return ExecNonQuery(sql);
        }
 
        /// <summary>
        /// 反射得到实体类(泛型T)的字段名称和值
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="t"></param>
        /// <returns></returns>
        private Dictionary<string, string> GetProperties<T>(T t)
        {
            var ret = new Dictionary<string, string>();
            if (t == null)
                return null;
 
            PropertyInfo[] properties = t.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
            if (properties.Length <= 0)
                return null;
 
            foreach (PropertyInfo item in properties)
            {
                string name = item.Name; //实体类字段名称
                string value = Convert.ToString(item.GetValue(t, null)); //该字段的值
                //Type type = item.PropertyType; //获取值value的类型
                if (item.PropertyType.IsValueType || item.PropertyType.Name.StartsWith("String"))
                {
                    ret.Add(name, value); //在此可转换value的类型
                }
            }
            return ret;
        }
 
 
        /// <summary>  
        /// 1.批量插入数据【Bulk】,并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDB(DataTable dt, string dtName)
        {
            string tableName = dtName ?? dt.TableName;
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))
            {
                OpenConnection(conn);
                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn))
                {
                    if (dt != null && dt.Rows.Count != 0)
                    {
                        bulkCopy.DestinationTableName = tableName; //数据表名称  
                        bulkCopy.BatchSize = dt.Rows.Count;
                        bulkCopy.WriteToServer(dt);
                        rowsCount = bulkCopy.BatchSize;
                    }
                    CloseConnection(conn);
                }
            }
            return rowsCount;
        }
        /// <summary>  
        /// 2.批量插入数据【Bulk】(加入内部事务),并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDBByTransaction(DataTable dt, string dtName)
        {
            SqlTransaction transaction = null; // 创建事务对象  
            try
            {
                string tableName = dtName ?? dt.TableName;
                int rowsCount = 0;
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    OpenConnection(conn);
                    using (SqlBulkCopy bulkCopy = new SqlBulkCopy(_ConnString, SqlBulkCopyOptions.UseInternalTransaction))
                    {
                        if (dt != null && dt.Rows.Count != 0)
                        {
                            bulkCopy.DestinationTableName = tableName; //数据表名称  
                            bulkCopy.BatchSize = dt.Rows.Count;
                            bulkCopy.WriteToServer(dt);
                            rowsCount = bulkCopy.BatchSize;
                        }
                        CloseConnection(conn);
                    }
                }
                return rowsCount;
            }
            catch (Exception ex)
            {
                transaction.Rollback(); //事务回滚  
                throw new Exception(ex.Message, ex);
            }
        }
        #endregion
 
        #region 开启连接SqlConnection.Open  
        /// <summary>  
        /// 打开OracleConnection  
        /// </summary>  
        /// <param name="conn">数据库连接对象</param>  
        private static void OpenConnection(SqlConnection conn)
        {
            try
            {
                if (conn.State == ConnectionState.Closed) conn.Open();
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message, null);
            }
        }
        #endregion
 
        #region 关闭连接,释放资源  
        /// <summary>  
        /// 关闭Connection  
        /// </summary>  
        /// <param name="conn">数据库(Oracle)连接对象</param>  
        private static void CloseConnection(SqlConnection conn)
        {
            if (conn.State == ConnectionState.Open)
            {
                conn.Close();
                conn.Dispose();//释放资源  
            }
        }
 
        /// <summary>  
        /// 关闭DataReader  
        /// </summary>  
        /// <param name="dataReader">数据读取器对象</param>  
        private static void CloseDataReader(SqlDataReader dataReader)
        {
            if (dataReader.IsClosed == false) dataReader.Close();
        }
        #endregion
    }
}

Connection.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
 
namespace DBHelper
{
    /// <summary>
    /// 数据库桥接对象模型
    /// </summary>
    public class Connection
    {
        /// <summary>
        /// 数据库IP
        /// </summary>
        public string DataSource { get; set; }
        /// <summary>
        /// 数据库授权账户
        /// </summary>
        public string UserID { get; set; }
        /// <summary>
        /// 数据库访问密码
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        /// 数据库名称
        /// </summary>
        public string InitialCatalog { get; set; } 
    }
}

DataSyncHelper 控制台调用测试:

using DBHelper;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Data;
 
namespace DataSyncHelper
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
 
            //源数据库(本地)连接
            var conn = new Connection
            {
                DataSource = "192.168.10.228",
                UserID = "sa",
                Password = "123456",
                InitialCatalog = "TestDB"
            };
 
            //目标数据库连接(多个目标数据库添加List对象即可)
            var connList = new List<Connection> 
            {
                new Connection
                {
                    DataSource = "192.168.10.228",
                    UserID = "sa",
                    Password = "123456",
                    InitialCatalog = "TestDB2"
                }
            };
 
 
            Test1(conn);
 
            Test2(conn, connList);
 
        }
 
        /// <summary>
        /// 模拟数据库批量插入数据【INSERT INTO [TABLE] VALUES】
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        static void Test1(Connection conn) 
        {
            var ml = new List<TestTb>(); //模拟10条数据
            for (int i = 0; i < 10; i++)
            {
                ml.Add(new TestTb
                {
                    Id = Guid.NewGuid().ToString("N"),
                    Name = $"张三{i}",
                    No = i,
                    Birthday = DateTime.Now,
                    Remark = $"张三{i}"
                });
            }
            var keyValuePair = new KeyValuePair<string, List<TestTb>>("TestTb", ml);
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            MsSqlHelper.GetSingleObj().InsertValuesToDB(keyValuePair);
        }
 
 
        /// <summary>
        /// 同步数据(一主多从)
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        /// <param name="connList">目标数据库桥接对象</param>
        static void Test2(Connection conn, List<Connection> connList) 
        {
            #region 源数据库(本地)连接
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            string sql = "SELECT * FROM [TestTb];";
            var dt = MsSqlHelper.GetSingleObj().GetDataTable(sql);
            dt.TableName = "TestTb";
            #endregion
 
            #region 目标数据库连接(Demo演示中暂时只有一个)
            var dtDic = new Dictionary<Connection, DataTable>(); //原始数据集(保留原始表数据)
            foreach (var item in connList)
            {
                MsSqlHelper.GetSingleObj().RegisterConn(item); //注册目标conn桥接对象
                var oldDt = MsSqlHelper.GetSingleObj().GetDataTable(sql); //查询历史数据
                dtDic.Add(item, oldDt);
                string delSql = "DELETE FROM [TestTb];"; //全表删除历史数据
                int rcount = MsSqlHelper.GetSingleObj().ExecNonQuery(delSql);
                int newRCount = MsSqlHelper.GetSingleObj().InsertBulkToDB(dt, dt.TableName);
            }
            #endregion
        }
 
    }
 
    /// <summary>
    /// 原始表模型
    /// </summary>
    public class TestTb 
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public int? No { get; set; } 
        public DateTime? Birthday { get; set; } 
        public string Remark { get; set; }
    }
}

测试结果

首先给源数据库【TestDB】模拟 10 条数据 =》 Test1 方法,执行结果如下:

image.png

同步目标数据库【TestDB2】中的【TestTB】对象 =》 Test2 方法,执行结果如下:

image.png

以上 Demo 演示完毕,在生产环境中可根据自己的实际情况修改调整。

目录
相关文章
|
18天前
|
数据可视化 网络协议 C#
C#/.NET/.NET Core优秀项目和框架2024年3月简报
公众号每月定期推广和分享的C#/.NET/.NET Core优秀项目和框架(每周至少会推荐两个优秀的项目和框架当然节假日除外),公众号推文中有项目和框架的介绍、功能特点、使用方式以及部分功能截图等(打不开或者打开GitHub很慢的同学可以优先查看公众号推文,文末一定会附带项目和框架源码地址)。注意:排名不分先后,都是十分优秀的开源项目和框架,每周定期更新分享(欢迎关注公众号:追逐时光者,第一时间获取每周精选分享资讯🔔)。
|
3月前
|
开发框架 前端开发 JavaScript
盘点72个ASP.NET Core源码Net爱好者不容错过
盘点72个ASP.NET Core源码Net爱好者不容错过
71 0
|
3月前
|
开发框架 .NET
ASP.NET Core NET7 增加session的方法
ASP.NET Core NET7 增加session的方法
37 0
|
3月前
|
开发框架 JavaScript .NET
ASP.NET Core的超级大BUG
ASP.NET Core的超级大BUG
43 0
|
4月前
|
存储 安全 编译器
|
3月前
|
开发框架 前端开发 .NET
ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程
ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程
42 0
|
1月前
|
开发框架 人工智能 .NET
C#/.NET/.NET Core拾遗补漏合集(持续更新)
C#/.NET/.NET Core拾遗补漏合集(持续更新)
|
1月前
|
开发框架 中间件 .NET
C# .NET面试系列七:ASP.NET Core
## 第一部分:ASP.NET Core #### 1. 如何在 controller 中注入 service? 在.NET中,在ASP.NET Core应用程序中的Controller中注入服务通常使用<u>依赖注入(Dependency Injection)</u>来实现。以下是一些步骤,说明如何在Controller中注入服务: 1、创建服务 首先,确保你已经在应用程序中注册了服务。这通常在Startup.cs文件的ConfigureServices方法中完成。例如: ```c# services.AddScoped<IMyService, MyService>(); //
65 0
|
2月前
|
开发框架 前端开发 .NET
福利来袭,.NET Core开发5大案例,30w字PDF文档大放送!!!
为了便于大家查找,特将之前开发的.Net Core相关的五大案例整理成文,共计440页,32w字,免费提供给大家,文章底部有PDF下载链接。
35 1
福利来袭,.NET Core开发5大案例,30w字PDF文档大放送!!!
|
2月前
|
算法 BI API
C#/.NET/.NET Core优秀项目和框架2024年1月简报
C#/.NET/.NET Core优秀项目和框架2024年1月简报