.NET Core 3.0 控制台实现数据同步

简介: 写作背景:应工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;技术点或 Nuget 元包:.NET Core 3.0Console;Microsoft.Data.SqlClient -v 1.0.19269.1;开发工具 VS 2019 Pro x64 v16.3.3;MS-SQLServer 2014 Enterprise ...

写作背景

因工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;

技术点或Nuget元包:

  • .NET Core 3.0 Console;
  • Microsoft.Data.SqlClient -v1.0.19269.1;
  • 开发工具 VS 2019 Pro x64 v16.3.3;
  • MS-SQLServer 2014 Enterprise;

实现目标:

本地开发环境的正式数据同步到线上目标数据库(单表全字段同结构模式);

说明:这里使用本地环境同数据库类型的不同 DB 实例模拟实现,假如数据库【TestDB】为源数据库,【TestDB2】为目标数据库,都有共同的数据表对象【TestTB】;

Demo 项目结构:

image.png

DHHelper 封装

基于 Microsoft.Data.SqlClient 简单的实现了所需的几个方法,如下所示:

MsSqlHelper.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Data;
using Microsoft.Data.SqlTypes;
using Microsoft.Data.SqlClient;
using System.Reflection;
 
namespace DBHelper
{
    public sealed class MsSqlHelper
    {
        #region 单利模式
        /// <summary>
        /// 1.构造函数私有化
        /// </summary>
        private MsSqlHelper() { }
 
        /// <summary>
        /// 2.创建私有化静态字段锁
        /// </summary>
        private static readonly object _ObjLock = new object();
 
        /// <summary>
        /// 3.创建私有化类对象,接收类的实例化对象
        /// volatile 关键字促进线程安全,保障线程有序执行
        /// </summary>
        private static volatile MsSqlHelper _MsSqlHelper = null;
 
        /// <summary>
        /// 4.创建类实例化对象
        /// </summary>
        /// <returns></returns>
        public static MsSqlHelper GetSingleObj()
        {
            if (_MsSqlHelper == null)
            {
                lock (_ObjLock) //保证只有一线程操作
                {
                    if (_MsSqlHelper == null)
                    {
                        _MsSqlHelper = new MsSqlHelper();
                    }
                }
            }
            return _MsSqlHelper;
        }
        #endregion
 
        // 数据库链接字符串
        private string _ConnString { get; set; }
 
        /// <summary>
        /// 初始化指定数据库桥接字符串
        /// </summary>
        /// <param name="sqlConnStrBuilder">数据库连接对象</param>
        public void RegisterConn(Connection conn)
        {
            var builder = new SqlConnectionStringBuilder
            {
                DataSource = conn.DataSource, //your_server.database.windows.net
                UserID = conn.UserID, //your_user  
                Password = conn.Password, //your_password  
                InitialCatalog = conn.InitialCatalog //your_database  
            };
            _ConnString = builder.ConnectionString;
        }
 
        #region 单个数据(添加,更新,删除)
        /// <summary>  
        /// 执行SQL语句,返回影响的记录数  
        /// Insert插入,Delete删除,Update更新  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">[可选]sql参数化</param>  
        /// <returns>int:受影响的行数</returns>  
        public int ExecNonQuery(string cmdText, params SqlParameter[] sqlParams)
        {
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))  // 建立数据库连接对象  
            {
                OpenConnection(conn);
                using (SqlCommand cmd = conn.CreateCommand())
                {
                    cmd.CommandType = CommandType.Text; //指定cmd命令类型为文本类型(默认,可不写);  
                    cmd.CommandText = cmdText; //sql语句或存储过程                         
                    if (sqlParams != null && sqlParams.Length > 0) //检查参数组是否有数据  
                    {
                        foreach (SqlParameter sqlParam in sqlParams)
                        {
                            //判断参数是否为null,是则转为数据库接受的DBnull
                            if (sqlParam.Value == null)
                            {
                                sqlParam.Value = DBNull.Value;
                            }
                            cmd.Parameters.Add(sqlParam); //参数格式化,防止sql注入  
                        }
                    }
                    rowsCount = cmd.ExecuteNonQuery(); //执行非查询命令,接收受影响行数,大于0的话表示添加成功  
                    cmd.Parameters.Clear();
                }
                CloseConnection(conn);
            }
            return rowsCount;
        }
        #endregion
 
        #region 查询操作
        /// <summary>  
        /// 返回数据库表DataTable  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">sql参数化</param>  
        /// <returns>DataTable</returns>  
        public DataTable GetDataTable(string cmdText, params SqlParameter[] sqlParams)
        {
            using (DataTable dt = new DataTable())
            {
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    using (SqlCommand cmd = conn.CreateCommand())
                    {
                        cmd.CommandText = cmdText;
                        if (sqlParams != null && sqlParams.Length > 0)
                        {
                            foreach (SqlParameter parameter in sqlParams)
                            {
                                //判断参数是否为null,是则转为数据库接受的DBnull
                                if (parameter.Value == null)
                                {
                                    parameter.Value = DBNull.Value;
                                }
                                //参数格式化,防止sql注入  
                                cmd.Parameters.Add(parameter);
                            }
                        }
                        //适配器自动打开数据库连接  
                        using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                        {
                            da.Fill(dt);
                        }
                        cmd.Parameters.Clear();
                    }
                }
                return dt;
            }
        }
 
        /// <summary>
        /// 执行多sql语句,返回数据集
        /// </summary>
        /// <param name="sqlTuples">list:tabNames[可选],sql,SqlParameter[]</param>
        /// <returns>DataSet</returns>
        public DataSet GetDataSet(List<Tuple<string, string, SqlParameter[]>> sqlTuples)
        {
            using (DataSet ds = new DataSet())
            {
                string tabName = string.Empty; //tab名称
                string cmdText = string.Empty; //sql语句   
                SqlParameter[] sqlParams = null;//sql参数化  
 
                if (sqlTuples != null && sqlTuples.Count > 0)
                {
                    foreach (var tuple in sqlTuples)
                    {
                        tabName = tuple.Item1; //tab名称
                        cmdText = tuple.Item2; //sql语句   
                        sqlParams = tuple.Item3;//sql格式化参数
                        using (SqlConnection conn = new SqlConnection(_ConnString))
                        {
                            using (SqlCommand cmd = conn.CreateCommand())
                            {
                                cmd.CommandText = cmdText;
                                //检查参数组是否有数据  
                                if (sqlParams != null && sqlParams.Length > 0)
                                {
                                    //cmd.Parameters.AddRange(sqlParams);
                                    foreach (SqlParameter parameter in sqlParams)
                                    {
                                        //判断参数是否为null,是则转为数据库接受的DBnull
                                        if (parameter.Value == null)
                                        {
                                            parameter.Value = DBNull.Value;
                                        }
                                        //参数格式化,防止sql注入  
                                        cmd.Parameters.Add(parameter);
                                    }
                                }
 
                                //适配器自动打开数据库连接  
                                using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                                {
                                    if (string.IsNullOrWhiteSpace(tabName))
                                        da.Fill(ds);
                                    else
                                        da.Fill(ds, tabName); // 将tabName查询结果集合填入DataSet中,并且将DataTable命名为tabName  
                                }
                                cmd.Parameters.Clear();
                            }
                        }
                    }
                }
                return ds;
            }
        }
        #endregion
 
        #region 批量添加数据
        /// <summary>
        /// 批量插入数据【INSERT INTO [TABLE] VALUES】,并返回受影响的行数  
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="keyValuePair"></param>
        /// <returns></returns>
        public int InsertValuesToDB<T>(KeyValuePair<string,List<T>> keyValuePair) 
        {
            string sql = $"INSERT INTO [{keyValuePair.Key}] @NAME VALUES @VALUES;";
            var dicList = new List<Dictionary<string, string>>();
            foreach (var item in keyValuePair.Value)
            {
                dicList.Add(GetProperties(item));
            }
 
            var name = new StringBuilder();
            var values = new StringBuilder();
            for (int i = 0; i < dicList.Count; i++)
            {
                var dic = dicList[i];
                int columns = 0;
                var tmpValue = new StringBuilder();
                foreach (var item in dic)
                {
                    if (i == 0)
                    {
                        if (columns < dic.Count -1)
                        {
                            name.Append($"[{item.Key}],");
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            name.Append($"[{item.Key}]");
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else if(i >0 && i < dic.Count - 1)
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    columns++;
                }
                if (i == 0)
                {
                    sql = sql.Replace("@NAME", $"({name.ToString()})");
                }
                if (i < dicList.Count - 1)
                {
                    values.AppendLine($"({tmpValue.ToString()}),");
                }
                else
                {
                    values.Append($"({tmpValue.ToString()})");
                }
            }
            sql = sql.Replace("@VALUES", values.ToString());
            return ExecNonQuery(sql);
        }
 
        /// <summary>
        /// 反射得到实体类(泛型T)的字段名称和值
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="t"></param>
        /// <returns></returns>
        private Dictionary<string, string> GetProperties<T>(T t)
        {
            var ret = new Dictionary<string, string>();
            if (t == null)
                return null;
 
            PropertyInfo[] properties = t.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
            if (properties.Length <= 0)
                return null;
 
            foreach (PropertyInfo item in properties)
            {
                string name = item.Name; //实体类字段名称
                string value = Convert.ToString(item.GetValue(t, null)); //该字段的值
                //Type type = item.PropertyType; //获取值value的类型
                if (item.PropertyType.IsValueType || item.PropertyType.Name.StartsWith("String"))
                {
                    ret.Add(name, value); //在此可转换value的类型
                }
            }
            return ret;
        }
 
 
        /// <summary>  
        /// 1.批量插入数据【Bulk】,并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDB(DataTable dt, string dtName)
        {
            string tableName = dtName ?? dt.TableName;
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))
            {
                OpenConnection(conn);
                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn))
                {
                    if (dt != null && dt.Rows.Count != 0)
                    {
                        bulkCopy.DestinationTableName = tableName; //数据表名称  
                        bulkCopy.BatchSize = dt.Rows.Count;
                        bulkCopy.WriteToServer(dt);
                        rowsCount = bulkCopy.BatchSize;
                    }
                    CloseConnection(conn);
                }
            }
            return rowsCount;
        }
        /// <summary>  
        /// 2.批量插入数据【Bulk】(加入内部事务),并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDBByTransaction(DataTable dt, string dtName)
        {
            SqlTransaction transaction = null; // 创建事务对象  
            try
            {
                string tableName = dtName ?? dt.TableName;
                int rowsCount = 0;
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    OpenConnection(conn);
                    using (SqlBulkCopy bulkCopy = new SqlBulkCopy(_ConnString, SqlBulkCopyOptions.UseInternalTransaction))
                    {
                        if (dt != null && dt.Rows.Count != 0)
                        {
                            bulkCopy.DestinationTableName = tableName; //数据表名称  
                            bulkCopy.BatchSize = dt.Rows.Count;
                            bulkCopy.WriteToServer(dt);
                            rowsCount = bulkCopy.BatchSize;
                        }
                        CloseConnection(conn);
                    }
                }
                return rowsCount;
            }
            catch (Exception ex)
            {
                transaction.Rollback(); //事务回滚  
                throw new Exception(ex.Message, ex);
            }
        }
        #endregion
 
        #region 开启连接SqlConnection.Open  
        /// <summary>  
        /// 打开OracleConnection  
        /// </summary>  
        /// <param name="conn">数据库连接对象</param>  
        private static void OpenConnection(SqlConnection conn)
        {
            try
            {
                if (conn.State == ConnectionState.Closed) conn.Open();
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message, null);
            }
        }
        #endregion
 
        #region 关闭连接,释放资源  
        /// <summary>  
        /// 关闭Connection  
        /// </summary>  
        /// <param name="conn">数据库(Oracle)连接对象</param>  
        private static void CloseConnection(SqlConnection conn)
        {
            if (conn.State == ConnectionState.Open)
            {
                conn.Close();
                conn.Dispose();//释放资源  
            }
        }
 
        /// <summary>  
        /// 关闭DataReader  
        /// </summary>  
        /// <param name="dataReader">数据读取器对象</param>  
        private static void CloseDataReader(SqlDataReader dataReader)
        {
            if (dataReader.IsClosed == false) dataReader.Close();
        }
        #endregion
    }
}

Connection.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
 
namespace DBHelper
{
    /// <summary>
    /// 数据库桥接对象模型
    /// </summary>
    public class Connection
    {
        /// <summary>
        /// 数据库IP
        /// </summary>
        public string DataSource { get; set; }
        /// <summary>
        /// 数据库授权账户
        /// </summary>
        public string UserID { get; set; }
        /// <summary>
        /// 数据库访问密码
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        /// 数据库名称
        /// </summary>
        public string InitialCatalog { get; set; } 
    }
}

DataSyncHelper 控制台调用测试:

using DBHelper;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Data;
 
namespace DataSyncHelper
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
 
            //源数据库(本地)连接
            var conn = new Connection
            {
                DataSource = "192.168.10.228",
                UserID = "sa",
                Password = "123456",
                InitialCatalog = "TestDB"
            };
 
            //目标数据库连接(多个目标数据库添加List对象即可)
            var connList = new List<Connection> 
            {
                new Connection
                {
                    DataSource = "192.168.10.228",
                    UserID = "sa",
                    Password = "123456",
                    InitialCatalog = "TestDB2"
                }
            };
 
 
            Test1(conn);
 
            Test2(conn, connList);
 
        }
 
        /// <summary>
        /// 模拟数据库批量插入数据【INSERT INTO [TABLE] VALUES】
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        static void Test1(Connection conn) 
        {
            var ml = new List<TestTb>(); //模拟10条数据
            for (int i = 0; i < 10; i++)
            {
                ml.Add(new TestTb
                {
                    Id = Guid.NewGuid().ToString("N"),
                    Name = $"张三{i}",
                    No = i,
                    Birthday = DateTime.Now,
                    Remark = $"张三{i}"
                });
            }
            var keyValuePair = new KeyValuePair<string, List<TestTb>>("TestTb", ml);
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            MsSqlHelper.GetSingleObj().InsertValuesToDB(keyValuePair);
        }
 
 
        /// <summary>
        /// 同步数据(一主多从)
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        /// <param name="connList">目标数据库桥接对象</param>
        static void Test2(Connection conn, List<Connection> connList) 
        {
            #region 源数据库(本地)连接
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            string sql = "SELECT * FROM [TestTb];";
            var dt = MsSqlHelper.GetSingleObj().GetDataTable(sql);
            dt.TableName = "TestTb";
            #endregion
 
            #region 目标数据库连接(Demo演示中暂时只有一个)
            var dtDic = new Dictionary<Connection, DataTable>(); //原始数据集(保留原始表数据)
            foreach (var item in connList)
            {
                MsSqlHelper.GetSingleObj().RegisterConn(item); //注册目标conn桥接对象
                var oldDt = MsSqlHelper.GetSingleObj().GetDataTable(sql); //查询历史数据
                dtDic.Add(item, oldDt);
                string delSql = "DELETE FROM [TestTb];"; //全表删除历史数据
                int rcount = MsSqlHelper.GetSingleObj().ExecNonQuery(delSql);
                int newRCount = MsSqlHelper.GetSingleObj().InsertBulkToDB(dt, dt.TableName);
            }
            #endregion
        }
 
    }
 
    /// <summary>
    /// 原始表模型
    /// </summary>
    public class TestTb 
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public int? No { get; set; } 
        public DateTime? Birthday { get; set; } 
        public string Remark { get; set; }
    }
}

测试结果

首先给源数据库【TestDB】模拟 10 条数据 =》 Test1 方法,执行结果如下:

image.png

同步目标数据库【TestDB2】中的【TestTB】对象 =》 Test2 方法,执行结果如下:

image.png

以上 Demo 演示完毕,在生产环境中可根据自己的实际情况修改调整。

目录
相关文章
|
1月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
2月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
2月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
92 3
|
1月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
2月前
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
|
3月前
|
开发框架 监控 .NET
开发者的革新利器:ASP.NET Core实战指南,构建未来Web应用的高效之道
【8月更文挑战第28天】本文探讨了如何利用ASP.NET Core构建高效、可扩展的Web应用。ASP.NET Core是一个开源、跨平台的框架,具有依赖注入、配置管理等特性。文章详细介绍了项目结构规划、依赖注入配置、中间件使用及性能优化方法,并讨论了安全性、可扩展性以及容器化的重要性。通过这些技术要点,开发者能够快速构建出符合现代Web应用需求的应用程序。
57 0
|
3月前
|
缓存 数据库连接 API
Entity Framework Core——.NET 领域的 ORM 利器,深度剖析其最佳实践之路
【8月更文挑战第28天】在软件开发领域,高效的数据访问与管理至关重要。Entity Framework Core(EF Core)作为一款强大的对象关系映射(ORM)工具,在 .NET 开发中扮演着重要角色。本文通过在线书店应用案例,展示了 EF Core 的核心特性和优势。我们定义了 `Book` 实体类及其属性,并通过 `BookStoreContext` 数据库上下文配置了数据库连接。EF Core 提供了简洁的 API,支持数据的查询、插入、更新和删除操作。
115 0
|
3月前
|
开发框架 监控 .NET
【Azure 应用程序见解】在Docker中运行的ASP.NET Core应用如何开启Application Insights的Profiler Trace呢?
【Azure 应用程序见解】在Docker中运行的ASP.NET Core应用如何开启Application Insights的Profiler Trace呢?
|
3月前
|
Linux C# C++
【Azure App Service For Container】创建ASP.NET Core Blazor项目并打包为Linux镜像发布到Azure应用服务
【Azure App Service For Container】创建ASP.NET Core Blazor项目并打包为Linux镜像发布到Azure应用服务
|
3月前
|
开发框架 .NET API
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
169 0

热门文章

最新文章