.NET Core 3.0 控制台实现数据同步

简介: 写作背景:应工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;技术点或 Nuget 元包:.NET Core 3.0Console;Microsoft.Data.SqlClient -v 1.0.19269.1;开发工具 VS 2019 Pro x64 v16.3.3;MS-SQLServer 2014 Enterprise ...

写作背景

因工作环境中存在一个数据库实例多站点部署模式,每次同步数据都需要手动从本地导入目标站点数据库,空余之际写了个简单 Demo;

技术点或Nuget元包:

  • .NET Core 3.0 Console;
  • Microsoft.Data.SqlClient -v1.0.19269.1;
  • 开发工具 VS 2019 Pro x64 v16.3.3;
  • MS-SQLServer 2014 Enterprise;

实现目标:

本地开发环境的正式数据同步到线上目标数据库(单表全字段同结构模式);

说明:这里使用本地环境同数据库类型的不同 DB 实例模拟实现,假如数据库【TestDB】为源数据库,【TestDB2】为目标数据库,都有共同的数据表对象【TestTB】;

Demo 项目结构:

image.png

DHHelper 封装

基于 Microsoft.Data.SqlClient 简单的实现了所需的几个方法,如下所示:

MsSqlHelper.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Data;
using Microsoft.Data.SqlTypes;
using Microsoft.Data.SqlClient;
using System.Reflection;
 
namespace DBHelper
{
    public sealed class MsSqlHelper
    {
        #region 单利模式
        /// <summary>
        /// 1.构造函数私有化
        /// </summary>
        private MsSqlHelper() { }
 
        /// <summary>
        /// 2.创建私有化静态字段锁
        /// </summary>
        private static readonly object _ObjLock = new object();
 
        /// <summary>
        /// 3.创建私有化类对象,接收类的实例化对象
        /// volatile 关键字促进线程安全,保障线程有序执行
        /// </summary>
        private static volatile MsSqlHelper _MsSqlHelper = null;
 
        /// <summary>
        /// 4.创建类实例化对象
        /// </summary>
        /// <returns></returns>
        public static MsSqlHelper GetSingleObj()
        {
            if (_MsSqlHelper == null)
            {
                lock (_ObjLock) //保证只有一线程操作
                {
                    if (_MsSqlHelper == null)
                    {
                        _MsSqlHelper = new MsSqlHelper();
                    }
                }
            }
            return _MsSqlHelper;
        }
        #endregion
 
        // 数据库链接字符串
        private string _ConnString { get; set; }
 
        /// <summary>
        /// 初始化指定数据库桥接字符串
        /// </summary>
        /// <param name="sqlConnStrBuilder">数据库连接对象</param>
        public void RegisterConn(Connection conn)
        {
            var builder = new SqlConnectionStringBuilder
            {
                DataSource = conn.DataSource, //your_server.database.windows.net
                UserID = conn.UserID, //your_user  
                Password = conn.Password, //your_password  
                InitialCatalog = conn.InitialCatalog //your_database  
            };
            _ConnString = builder.ConnectionString;
        }
 
        #region 单个数据(添加,更新,删除)
        /// <summary>  
        /// 执行SQL语句,返回影响的记录数  
        /// Insert插入,Delete删除,Update更新  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">[可选]sql参数化</param>  
        /// <returns>int:受影响的行数</returns>  
        public int ExecNonQuery(string cmdText, params SqlParameter[] sqlParams)
        {
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))  // 建立数据库连接对象  
            {
                OpenConnection(conn);
                using (SqlCommand cmd = conn.CreateCommand())
                {
                    cmd.CommandType = CommandType.Text; //指定cmd命令类型为文本类型(默认,可不写);  
                    cmd.CommandText = cmdText; //sql语句或存储过程                         
                    if (sqlParams != null && sqlParams.Length > 0) //检查参数组是否有数据  
                    {
                        foreach (SqlParameter sqlParam in sqlParams)
                        {
                            //判断参数是否为null,是则转为数据库接受的DBnull
                            if (sqlParam.Value == null)
                            {
                                sqlParam.Value = DBNull.Value;
                            }
                            cmd.Parameters.Add(sqlParam); //参数格式化,防止sql注入  
                        }
                    }
                    rowsCount = cmd.ExecuteNonQuery(); //执行非查询命令,接收受影响行数,大于0的话表示添加成功  
                    cmd.Parameters.Clear();
                }
                CloseConnection(conn);
            }
            return rowsCount;
        }
        #endregion
 
        #region 查询操作
        /// <summary>  
        /// 返回数据库表DataTable  
        /// </summary>  
        /// <param name="cmdText">sql语句</param>  
        /// <param name="sqlParams">sql参数化</param>  
        /// <returns>DataTable</returns>  
        public DataTable GetDataTable(string cmdText, params SqlParameter[] sqlParams)
        {
            using (DataTable dt = new DataTable())
            {
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    using (SqlCommand cmd = conn.CreateCommand())
                    {
                        cmd.CommandText = cmdText;
                        if (sqlParams != null && sqlParams.Length > 0)
                        {
                            foreach (SqlParameter parameter in sqlParams)
                            {
                                //判断参数是否为null,是则转为数据库接受的DBnull
                                if (parameter.Value == null)
                                {
                                    parameter.Value = DBNull.Value;
                                }
                                //参数格式化,防止sql注入  
                                cmd.Parameters.Add(parameter);
                            }
                        }
                        //适配器自动打开数据库连接  
                        using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                        {
                            da.Fill(dt);
                        }
                        cmd.Parameters.Clear();
                    }
                }
                return dt;
            }
        }
 
        /// <summary>
        /// 执行多sql语句,返回数据集
        /// </summary>
        /// <param name="sqlTuples">list:tabNames[可选],sql,SqlParameter[]</param>
        /// <returns>DataSet</returns>
        public DataSet GetDataSet(List<Tuple<string, string, SqlParameter[]>> sqlTuples)
        {
            using (DataSet ds = new DataSet())
            {
                string tabName = string.Empty; //tab名称
                string cmdText = string.Empty; //sql语句   
                SqlParameter[] sqlParams = null;//sql参数化  
 
                if (sqlTuples != null && sqlTuples.Count > 0)
                {
                    foreach (var tuple in sqlTuples)
                    {
                        tabName = tuple.Item1; //tab名称
                        cmdText = tuple.Item2; //sql语句   
                        sqlParams = tuple.Item3;//sql格式化参数
                        using (SqlConnection conn = new SqlConnection(_ConnString))
                        {
                            using (SqlCommand cmd = conn.CreateCommand())
                            {
                                cmd.CommandText = cmdText;
                                //检查参数组是否有数据  
                                if (sqlParams != null && sqlParams.Length > 0)
                                {
                                    //cmd.Parameters.AddRange(sqlParams);
                                    foreach (SqlParameter parameter in sqlParams)
                                    {
                                        //判断参数是否为null,是则转为数据库接受的DBnull
                                        if (parameter.Value == null)
                                        {
                                            parameter.Value = DBNull.Value;
                                        }
                                        //参数格式化,防止sql注入  
                                        cmd.Parameters.Add(parameter);
                                    }
                                }
 
                                //适配器自动打开数据库连接  
                                using (SqlDataAdapter da = new SqlDataAdapter(cmd))
                                {
                                    if (string.IsNullOrWhiteSpace(tabName))
                                        da.Fill(ds);
                                    else
                                        da.Fill(ds, tabName); // 将tabName查询结果集合填入DataSet中,并且将DataTable命名为tabName  
                                }
                                cmd.Parameters.Clear();
                            }
                        }
                    }
                }
                return ds;
            }
        }
        #endregion
 
        #region 批量添加数据
        /// <summary>
        /// 批量插入数据【INSERT INTO [TABLE] VALUES】,并返回受影响的行数  
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="keyValuePair"></param>
        /// <returns></returns>
        public int InsertValuesToDB<T>(KeyValuePair<string,List<T>> keyValuePair) 
        {
            string sql = $"INSERT INTO [{keyValuePair.Key}] @NAME VALUES @VALUES;";
            var dicList = new List<Dictionary<string, string>>();
            foreach (var item in keyValuePair.Value)
            {
                dicList.Add(GetProperties(item));
            }
 
            var name = new StringBuilder();
            var values = new StringBuilder();
            for (int i = 0; i < dicList.Count; i++)
            {
                var dic = dicList[i];
                int columns = 0;
                var tmpValue = new StringBuilder();
                foreach (var item in dic)
                {
                    if (i == 0)
                    {
                        if (columns < dic.Count -1)
                        {
                            name.Append($"[{item.Key}],");
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            name.Append($"[{item.Key}]");
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else if(i >0 && i < dic.Count - 1)
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    else
                    {
                        if (columns < dic.Count - 1)
                        {
                            tmpValue.Append($"'{item.Value}',");
                        }
                        else
                        {
                            tmpValue.Append($"'{item.Value}'");
                        }
                    }
                    columns++;
                }
                if (i == 0)
                {
                    sql = sql.Replace("@NAME", $"({name.ToString()})");
                }
                if (i < dicList.Count - 1)
                {
                    values.AppendLine($"({tmpValue.ToString()}),");
                }
                else
                {
                    values.Append($"({tmpValue.ToString()})");
                }
            }
            sql = sql.Replace("@VALUES", values.ToString());
            return ExecNonQuery(sql);
        }
 
        /// <summary>
        /// 反射得到实体类(泛型T)的字段名称和值
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="t"></param>
        /// <returns></returns>
        private Dictionary<string, string> GetProperties<T>(T t)
        {
            var ret = new Dictionary<string, string>();
            if (t == null)
                return null;
 
            PropertyInfo[] properties = t.GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
            if (properties.Length <= 0)
                return null;
 
            foreach (PropertyInfo item in properties)
            {
                string name = item.Name; //实体类字段名称
                string value = Convert.ToString(item.GetValue(t, null)); //该字段的值
                //Type type = item.PropertyType; //获取值value的类型
                if (item.PropertyType.IsValueType || item.PropertyType.Name.StartsWith("String"))
                {
                    ret.Add(name, value); //在此可转换value的类型
                }
            }
            return ret;
        }
 
 
        /// <summary>  
        /// 1.批量插入数据【Bulk】,并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDB(DataTable dt, string dtName)
        {
            string tableName = dtName ?? dt.TableName;
            int rowsCount = 0;
            using (SqlConnection conn = new SqlConnection(_ConnString))
            {
                OpenConnection(conn);
                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(conn))
                {
                    if (dt != null && dt.Rows.Count != 0)
                    {
                        bulkCopy.DestinationTableName = tableName; //数据表名称  
                        bulkCopy.BatchSize = dt.Rows.Count;
                        bulkCopy.WriteToServer(dt);
                        rowsCount = bulkCopy.BatchSize;
                    }
                    CloseConnection(conn);
                }
            }
            return rowsCount;
        }
        /// <summary>  
        /// 2.批量插入数据【Bulk】(加入内部事务),并返回受影响的行数  
        /// </summary>  
        /// <param name="dt">DataTable</param> 
        /// <param name="dtName">表名称</param>  
        /// <returns>int:受影响的行数</returns>  
        public int InsertBulkToDBByTransaction(DataTable dt, string dtName)
        {
            SqlTransaction transaction = null; // 创建事务对象  
            try
            {
                string tableName = dtName ?? dt.TableName;
                int rowsCount = 0;
                using (SqlConnection conn = new SqlConnection(_ConnString))
                {
                    OpenConnection(conn);
                    using (SqlBulkCopy bulkCopy = new SqlBulkCopy(_ConnString, SqlBulkCopyOptions.UseInternalTransaction))
                    {
                        if (dt != null && dt.Rows.Count != 0)
                        {
                            bulkCopy.DestinationTableName = tableName; //数据表名称  
                            bulkCopy.BatchSize = dt.Rows.Count;
                            bulkCopy.WriteToServer(dt);
                            rowsCount = bulkCopy.BatchSize;
                        }
                        CloseConnection(conn);
                    }
                }
                return rowsCount;
            }
            catch (Exception ex)
            {
                transaction.Rollback(); //事务回滚  
                throw new Exception(ex.Message, ex);
            }
        }
        #endregion
 
        #region 开启连接SqlConnection.Open  
        /// <summary>  
        /// 打开OracleConnection  
        /// </summary>  
        /// <param name="conn">数据库连接对象</param>  
        private static void OpenConnection(SqlConnection conn)
        {
            try
            {
                if (conn.State == ConnectionState.Closed) conn.Open();
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message, null);
            }
        }
        #endregion
 
        #region 关闭连接,释放资源  
        /// <summary>  
        /// 关闭Connection  
        /// </summary>  
        /// <param name="conn">数据库(Oracle)连接对象</param>  
        private static void CloseConnection(SqlConnection conn)
        {
            if (conn.State == ConnectionState.Open)
            {
                conn.Close();
                conn.Dispose();//释放资源  
            }
        }
 
        /// <summary>  
        /// 关闭DataReader  
        /// </summary>  
        /// <param name="dataReader">数据读取器对象</param>  
        private static void CloseDataReader(SqlDataReader dataReader)
        {
            if (dataReader.IsClosed == false) dataReader.Close();
        }
        #endregion
    }
}

Connection.cs 类示例代码:

using System;
using System.Collections.Generic;
using System.Text;
 
namespace DBHelper
{
    /// <summary>
    /// 数据库桥接对象模型
    /// </summary>
    public class Connection
    {
        /// <summary>
        /// 数据库IP
        /// </summary>
        public string DataSource { get; set; }
        /// <summary>
        /// 数据库授权账户
        /// </summary>
        public string UserID { get; set; }
        /// <summary>
        /// 数据库访问密码
        /// </summary>
        public string Password { get; set; }
        /// <summary>
        /// 数据库名称
        /// </summary>
        public string InitialCatalog { get; set; } 
    }
}

DataSyncHelper 控制台调用测试:

using DBHelper;
using System;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using System.Data;
 
namespace DataSyncHelper
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
 
            //源数据库(本地)连接
            var conn = new Connection
            {
                DataSource = "192.168.10.228",
                UserID = "sa",
                Password = "123456",
                InitialCatalog = "TestDB"
            };
 
            //目标数据库连接(多个目标数据库添加List对象即可)
            var connList = new List<Connection> 
            {
                new Connection
                {
                    DataSource = "192.168.10.228",
                    UserID = "sa",
                    Password = "123456",
                    InitialCatalog = "TestDB2"
                }
            };
 
 
            Test1(conn);
 
            Test2(conn, connList);
 
        }
 
        /// <summary>
        /// 模拟数据库批量插入数据【INSERT INTO [TABLE] VALUES】
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        static void Test1(Connection conn) 
        {
            var ml = new List<TestTb>(); //模拟10条数据
            for (int i = 0; i < 10; i++)
            {
                ml.Add(new TestTb
                {
                    Id = Guid.NewGuid().ToString("N"),
                    Name = $"张三{i}",
                    No = i,
                    Birthday = DateTime.Now,
                    Remark = $"张三{i}"
                });
            }
            var keyValuePair = new KeyValuePair<string, List<TestTb>>("TestTb", ml);
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            MsSqlHelper.GetSingleObj().InsertValuesToDB(keyValuePair);
        }
 
 
        /// <summary>
        /// 同步数据(一主多从)
        /// </summary>
        /// <param name="conn">源数据库桥接对象</param>
        /// <param name="connList">目标数据库桥接对象</param>
        static void Test2(Connection conn, List<Connection> connList) 
        {
            #region 源数据库(本地)连接
            MsSqlHelper.GetSingleObj().RegisterConn(conn); //注册conn桥接对象
            string sql = "SELECT * FROM [TestTb];";
            var dt = MsSqlHelper.GetSingleObj().GetDataTable(sql);
            dt.TableName = "TestTb";
            #endregion
 
            #region 目标数据库连接(Demo演示中暂时只有一个)
            var dtDic = new Dictionary<Connection, DataTable>(); //原始数据集(保留原始表数据)
            foreach (var item in connList)
            {
                MsSqlHelper.GetSingleObj().RegisterConn(item); //注册目标conn桥接对象
                var oldDt = MsSqlHelper.GetSingleObj().GetDataTable(sql); //查询历史数据
                dtDic.Add(item, oldDt);
                string delSql = "DELETE FROM [TestTb];"; //全表删除历史数据
                int rcount = MsSqlHelper.GetSingleObj().ExecNonQuery(delSql);
                int newRCount = MsSqlHelper.GetSingleObj().InsertBulkToDB(dt, dt.TableName);
            }
            #endregion
        }
 
    }
 
    /// <summary>
    /// 原始表模型
    /// </summary>
    public class TestTb 
    {
        public string Id { get; set; }
        public string Name { get; set; }
        public int? No { get; set; } 
        public DateTime? Birthday { get; set; } 
        public string Remark { get; set; }
    }
}

测试结果

首先给源数据库【TestDB】模拟 10 条数据 =》 Test1 方法,执行结果如下:

image.png

同步目标数据库【TestDB2】中的【TestTB】对象 =》 Test2 方法,执行结果如下:

image.png

以上 Demo 演示完毕,在生产环境中可根据自己的实际情况修改调整。

目录
相关文章
|
2月前
|
开发框架 .NET C#
ASP.NET Core Blazor 路由配置和导航
大家好,我是码农刚子。本文系统介绍Blazor单页应用的路由机制,涵盖基础配置、路由参数、编程式导航及高级功能。通过@page指令定义路由,支持参数约束、可选参数与通配符捕获,结合NavigationManager实现页面跳转与参数传递,并演示用户管理、产品展示等典型场景,全面掌握Blazor路由从入门到实战的完整方案。
232 6
|
12月前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
295 5
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
272 0
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
259 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
12月前
|
开发框架 算法 中间件
ASP.NET Core 中的速率限制中间件
在ASP.NET Core中,速率限制中间件用于控制客户端请求速率,防止服务器过载并提高安全性。通过`AddRateLimiter`注册服务,并配置不同策略如固定窗口、滑动窗口、令牌桶和并发限制。这些策略可在全局、控制器或动作级别应用,支持自定义响应处理。使用中间件`UseRateLimiter`启用限流功能,并可通过属性禁用特定控制器或动作的限流。这有助于有效保护API免受滥用和过载。 欢迎关注我的公众号:Net分享 (239字符)
265 1
|
12月前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
328 3
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
214 3
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
295 3
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
315 4
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
205 0

热门文章

最新文章