SQL Server 多表数据增量获取和发布 4

本文涉及的产品
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
简介: 核心代码分析最关键的在于获取捕获表信息(系统表中间_CT结尾的数据)。根据网上资料查取,找到了获取当前捕获表时间区间范围内数据的方式。见[SQL Server 多表数据增量获取和发布 2.

核心代码分析

最关键的在于获取捕获表信息(系统表中间_CT结尾的数据)。
根据网上资料查取,找到了获取当前捕获表时间区间范围内数据的方式。
见[SQL Server 多表数据增量获取和发布 2.3(https://www.jianshu.com/p/6a400eca6e79)

--10.按照时间范围查询CDC结果
DECLARE @from_lsn BINARY(10),@end_lsn BINARY(10)
DECLARE @start_time DATETIME = '2018-08-01'
DECLARE @end_time DATETIME ='2018-08-30'
SELECT @from_lsn=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal',@start_time)
SELECT @end_lsn=sys.fn_cdc_map_time_to_lsn(' largest less than or equal',@end_time)
SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_Department(@from_lsn,@end_lsn,'all')

数据既然能够通过sql语句获取到,那么逻辑判断就会变得简单,通过分析我们可以发现select * from XXX ,XXX就是上文中讲到的CDC生成的表值函数,表值函数前面相等,可变化的就是架构名_表名称(dbo_Person)


img_3d82cc144eef60515a92ed7e3c56880e.png
image.png

所以我们完全可以通过拼接sql语句得到我们需要的内容,可以默认返回给我们的数据是不友好的,我们还需要自己在做一步设置,将某些字段变成我们好理解的内容
如对下文内容不理解,可翻阅LZ之前的文章

  • sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS UpdateTime
  • [__$operation] AS Operation

通过查看CDC生成的捕获表我们发现,其实他是在原来的数据表结构上新增了几个字段给我们,其他的表也相同。

img_26cad58ed602a7b2143e34bc96613086.png
image.png
img_f34a72265c6d80915e1958ed34cc4782.png
image.png

那我们在代码中对实体的设计就可以基于继承相同父类的方式,定义一个父类,拥有共同属性

    public partial class ExtBase
    {
        /// <summary>
        /// 更新时间
        /// </summary>
        public DateTime UpdateTime { get; set; }

        /// <summary>
        /// 操作方式 1 = 删除,2 = 插入,3 = 更新(旧值),4 = 更新(新值)
        /// </summary>
        public int Operation { get; set; }
    }

其他表都是在自己原来字段的基础上继承当前父类

   public class Department : ExtBase
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    public class Person : ExtBase
    {

        public int Id { get; set; }

        public string Name { get; set; }

        public int? Age { get; set; }
    }

实体类结构完毕后我们开始考虑获取数据的业务逻辑,根据业务我们可以假设获取数据的方法几乎相同,不同的地方就是返回的数据实体集合不同,那我们通过何种方法来完成逻辑的有效封装,这是需要考虑的问题。
经过思考,我构想出了一种方法

1、定义一个抽象基类,在其中定义公共业务逻辑(GetDate)方法,然后定义一个抽象方法,抽象方法需要被子类继承,而子类需要做的就是覆写父类的GetData方法,唯一需要修改的就是传递的实体——可以采用泛型变量的形式去实现
2、等所有的子类构建完成以后,创建一个简单工厂,传递需要的参数,然后根据参数中的唯一标识符,实例化对应的操作类去执行公共方法。

首先是基类抽象类

    /// <summary>
    /// Cdc 数据捕获服务类
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class CTBaseService
{
        /// <summary>
        /// 获取CDC捕获表的数据
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        /// <returns></returns>
        private List<T> GetRangeList<T>(string schema_table, DateTime startDateTime) where T : class, new()
        {
            //获取当前需要更新的日期集合列表
            var conn = new SqlConnection(StaticConst.Conn);
            string query = @"SELECT  * ,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS UpdateTime,[__$operation] AS Operation FROM [cdc].[{2}_CT] WHERE[__$operation] IN(1, 2, 4) AND sys.fn_cdc_map_lsn_to_time(__$start_lsn) > '{0}'
              AND sys.fn_cdc_map_lsn_to_time(__$start_lsn) <= '{1}';";
            string nowDate = DateTime.Now.ToString();
            query = string.Format(query, startDateTime.ToString(), nowDate, schema_table);
            var queryList = conn.Query<T>(query).ToList();
            return queryList;
        }


        /// <summary>
        /// 抽象方法,由父类实现
        /// </summary>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        public abstract void Work(int id, string schema_table, DateTime startDateTime);


        /// <summary>
        /// 得到CDC捕获数据并插入队列
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        protected void GetRangeListAndInsertQueue<T>(int id, string schema_table, DateTime startDateTime) where T : ExtBase, new()
        {
            //获取当前需要更新的日期集合列表
            List<T> queryList = GetRangeList<T>(schema_table, startDateTime);

            if (queryList.Count > 0)
            {
                //对集合进行操作 【集合序列化,注意集合反序列化】
                string jsonItem = JsonConvert.SerializeObject(queryList);
                QueueWorker.Instance.EnqueueItem(jsonItem);

                int iret = UpdateServiceLog(id, queryList);
                string nowDate = DateTime.Now.ToString();
                if (iret > 0)
                {
                    Log4NetHelper.Info(string.Format("表名:{0}\r\n 队列数据:{1} \r\n 插入时间:{2}", schema_table, jsonItem, nowDate));
                }
            }

        }

}

子类实现
我们可以发现子类实现非常好理解,正如上文所说,基类提供了一个抽象接口供子类实现,而子类也真的只需要修改一个实体就可以。如果大家不懂,可以多看几遍来裂解其中的设计方式。

    public class DepartmentCT : CTBaseService
    {
        public override void Work(int id, string schema_table, DateTime startDateTime)
        {
            base.GetRangeListAndInsertQueue<Department>(id, schema_table, startDateTime);
        }
    }
    public class PersonCT : CTBaseService
    {
        public override void Work(int id, string schema_table, DateTime startDateTime)
        {
          base.GetRangeListAndInsertQueue<Person>(id, schema_table, startDateTime);
        }
    }

最后我们建立一个工厂类,工厂类主要负责接受参数并创建对应的CT帮助类,代码结构如下。根据表名作为唯一标识符字段,创建***CT服务类,然后因为他们继承并覆写了父类抽象方法Work,所以调用.Work方法即可实现获取数据并插入队列的功能。

    public class CTCExecuteFactory
    {
        /// <summary>
        /// 执行服务
        /// </summary>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="updateTime"></param>
        public static void ExecuteService(int id, string schemaName, string tableName, DateTime updateTime)
        {
            CTBaseService service = null;
            switch (tableName.ToLower())
            {
                case ServiceTables.person: service = new PersonCT(); break;
                case ServiceTables.department: service = new DepartmentCT(); break;
                default: break;
            }

            if (service != null) service.Work(id, string.Format("{0}_{1}", schemaName, tableName), updateTime);
        }
    }

其他模块的代码我觉得属于正常理解范围内的东西,不予说明,有兴趣的可自行下载代码查看具体功能。
下载链接

相关实践学习
使用SQL语句管理索引
本次实验主要介绍如何在RDS-SQLServer数据库中,使用SQL语句管理索引。
SQL Server on Linux入门教程
SQL Server数据库一直只提供Windows下的版本。2016年微软宣布推出可运行在Linux系统下的SQL Server数据库,该版本目前还是早期预览版本。本课程主要介绍SQLServer On Linux的基本知识。 相关的阿里云产品:云数据库RDS&nbsp;SQL Server版 RDS SQL Server不仅拥有高可用架构和任意时间点的数据恢复功能,强力支撑各种企业应用,同时也包含了微软的License费用,减少额外支出。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/sqlserver
目录
相关文章
|
1天前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
8天前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
9天前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
11天前
|
SQL 监控 数据处理
SQL数据库数据修改操作详解
数据库是现代信息系统的重要组成部分,其中SQL(StructuredQueryLanguage)是管理和处理数据库的重要工具之一。在日常的业务运营过程中,数据的准确性和及时性对企业来说至关重要,这就需要掌握如何在数据库中正确地进行数据修改操作。本文将详细介绍在SQL数据库中如何修改数据,帮助读者更好
57 4
|
13天前
|
SQL 关系型数据库 MySQL
SQL批量插入测试数据的几种方法?
SQL批量插入测试数据的几种方法?
38 1
|
11天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
41 0
|
11天前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
25 0
|
11天前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
31 0
|
13天前
|
SQL
使用SQL进行集合查询和数据维护
使用SQL进行集合查询和数据维护
32 0
|
1月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
66 2