开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构

polardbx-cdc是云原生分布式数据库系统PolarDB-X的一个核心组件,负责全局增量日志的生成、分发和订阅。通过polardbx-cdc,PolarDB-X数据库可以对外提供完全兼容MySQL Binlog格式和协议的增量日志,可实现与MySQL Binlog下游生态工具的无缝对接。本篇将对polardbx-cdc的代码结构做一个系统介绍,并讲解一下如何快速搭建开发环境。


一、总体架构


               


如上图所示,PolarDB-X包含4个核心组件,CN(Compute Node)负责计算、DN(Data Node)负责存储、GMS(Global Meta Service)负责管理元数据和提供TSO服务,CDC(Change Data Capture)负责生成变更日志。其中,CDC作为Binlog日志数据的出口,主要负责3方面的工作:


 基于TSO,对各个DN的原始物理Binlog进行排序和归并,构建出全局有序的事务队列。


 将事务队列中的数据,持久化到存储介质,生成兼容MySQL Binlog格式的逻辑Binlog文件。


 提供兼容MySQL Dump协议的消费订阅能力,对外提供Replication服务。


更加形象的来理解,polardbx-cdc是一个Streaming系统,InputSource是各个DN的原始Binlog,Computing逻辑是一系列计算规则和处理算法(排序、过略、整形、合并等等),TargetSink是屏蔽了分布式数据库内部细节的全局逻辑Binlog文件,并提供了兼容MySQL生态的Replication能力。其运行时状态图,如下所示:


         


从技术架构的角度来看,CDC共包含3个核心组件,分别是Daemon、Task和Dumper,如下所示:


         


 一个Node代表一个运行节点,可以是一台物理机、一台ECS或一个Docker容器,可以部署1到n个Node,但如果需要HA能力,则至少需要部署两个。  


 每个Node上都会运行一个Daemon进程,该进程负责进行监控、管控、调度等功能,其定位可以和Hadoop中的NodeManager类比。多个Daemon进程中会有一个Leader角色,通过抢占的方式来获取,负责一些中心化的控制功能。


 Dumper负责从Task拉取经过处理后的binlog数据,将数据持久化到逻辑Binlog文件,并对外提供兼容Mysql Dump协议的Replication服务。每个Node上都会运行一个Dumper进程,其中有一个Dumper会被选定为Master角色,其余Dumper的角色为Slave,Slave会实时复制Master的全局逻辑Binlog文件,当发生故障时,Daemon会负责完成Master的故障转移。


 Task是CDC的内核,负责事务排序、数据整形、事务合并、DDL处理、元数据生命周期管理等功能,整个CDC集群只会有一个Task进程,调度程序会选择负载最低的Node运行Task,并尽量保证Dumper Master和Task在不同的Node。


二、工程结构


polardbx-cdc工程代码托管在GitHub上(仓库地址),遵循Apache License 2.0协议。polardbx-cdc是一个多模块的Java项目,模块之间通过接口暴露服务,模块关系记录在pom.xml中,通过mvn dependency:tree命令可以查看全部依赖。


               


如上图所示,整个项目共包含15个目录模块(除此之外,CDC在polardbx-cdc项目中还有部分功能模块,具体可参见[附3]),下面对每个模块展开介绍。  


 [模块]codestye


提供了一份内置的代码风格配置文件,主要面向IDEA,如果有兴趣贡献源码,可基于该style文件进行设置。


 [模块]docker


提供了一系列docker相关的配置文件,通过运行该目录下的build.sh,可以在本地快速构建出一份dokcer镜像。


 [模块]polardbx-cdc-assemble


binlog数据解析模块,引用了开源Canal的源码,并做了很多定制化改造,核心代码介绍如下:


包(文件)名称 功能简介
com.aliyun.polardbx.binlog.canal.binlog 定义了对应MySQL Binlog Event的各种数据模型,以及对Binlog二进制数据进行Decode的核心实现 
com.aliyun.polardbx.binlog.canal.core  Canal的运行时内核模块,包含数据解析流程的管理、ddl的处理、权限的管理、HA的处理等等 

 [模块]polardbx-cdc-common


公共类库模块,提供了基础工具、领域模型、数据访问、公共配置等,核心代码介绍如下:  


包(文件)名称 功能简介
com.aliyun.polardbx.binlog.domain  包含了领域模型类的定义,其中po包下的源码为代码生成工具自动生成,对应了CDC的元数据表 
com.aliyun.polardbx.binlog.dao  包含了所有的数据访问层类定义,也是代码生成工具自动生成
com.aliyun.polardbx.binlog.heartbeat  包含了TsoHeartbeat组件,保证binlog stream不断向前流动,也是混合事务策略场景下生成虚拟TSO的必要条件
com.aliyun.polardbx.binlog.leader 提供了一个基于MySQL GET_LOCK函数实现的LeaderElector 
com.aliyun.polardbx.binlog.rpc 定义了Task和Dumper之间进行数据交互的Rpc接口
com.aliyun.polardbx.binlog.scheduler 包含了集群调度和资源分配相关的功能实现 
com.aliyun.polardbx.binlog.task 包含了Task和Dumper组件定时心跳相关的功能实现
generatorConfig.xml

代码生成器配置文件,使用了MyBatis Generator进行自动化代码生成

 [模块]polardbx-cdc-daemon


守护进程模块,提供了HA调度、TSO心跳、监控信息收集、OpenAPI访问等功能,核心代码介绍如下:


包(文件)名称  功能简介 
com.aliyun.polardbx.binlog.daemon.cluster com.aliyun.polardbx.binlog.daemon.schedule  包含了集群管控相关功能,心跳、运行时拓扑调度、HA探活等
com.aliyun.polardbx.binlog.daemon.rest 包含了若干Rest风格的接口,报警事件收集、Metrics收集、系统参数设置等

 [模块]polardbx-cdc-dumper


全局Binlog文件dump模块,接收Task模块输入的binlog数据并落盘,并对外提供兼容MySQL dump协议的数据订阅服务,核心代码介绍如下:


包(文件)名称  功能简介 
com.aliyun.polardbx.binlog.dumper.dump.client Dumper主备之间进行数据同步,对应的Client实现 
com.aliyun.polardbx.binlog.dumper.dump.logfile  逻辑Binlog文件构建模块,最为核心的两个类分别是LogFileGenerator和LogFileCopier,分别负责Dumper 


 [模块]polardbx-cdc-format


数据整形模块,对物理binlog进行格式转换(增加列,删减列,物理库表转化为逻辑库表名称等),保证转换后的数据格式和逻辑库表Schema始终保持一致的状态。


 [模块]polardbx-cdc-meta


元数据管理模块,以时间线为基准,对PolarDB-X的所有逻辑库表和物理库表进行历史版本维护,可以构建出给定任一时间点的Schema快照,是DDL处理和Binlog整形的基础支撑。此外,该模块还维护了CDC系统库表的Sql脚本定义(src/main/resources/db/migration),CDC使用Flyway进行表结构的管理。


 [模块]polardbx-cdc-monitor


监控模块,一个内置的监控实现,用来管理和维护监控事件。在运行态,监控信息会通过该模块发送到daemon进程。


 [模块]polardbx-cdc-protocol


数据传输协议定义模块,cdc使用gRpc和protobuf来进行数据的传输,此模块定义了数据协议接口和数据交换格式,核心代码介绍如下:


包(文件)名称  功能简介 
com.aliyun.polardbx.binlog.protocol  包含了Task节点和Dumper节点间的数据交换格式定义 
com.aliyun.polardbx.rpc.cdc  包含了Dumper节点和CN节点间的数据交换格式定义


 [模块]polardbx-cdc-storage


数据存储模块,基于rocksdb进行了封装扩展,当内存资源不足时(如大事务、big column等场景),用来将内存数据转存到磁盘  


 [模块]polardbx-cdc-task


核心任务模块,可以认为是CDC的kernel,最核心的业务逻辑诸如事务的排序、整形、合并、DDL的处理、元数据的生命周期维护等操作,均在此模块完成。核心代码介绍如下:


包(文件)名称  功能简介 
com.aliyun.polardbx.binlog.extractor 包含了一级排序功能,对每个DN的原始物理Binlog进行解析,并按照TSO进行排序,辅以数据整形、元数据历史版本维护、ddl处理等功能 
com.aliyun.polardbx.binlog.merge  包含了全局排序功能,接收一级排序输出的有序队列,进行多路归并,输出全局有序的事务队列
com.aliyun.polardbx.binlog.collect 包含了事务合并功能,从全局有序的事务队列提取数据,合并具有相同事务id的局部事务,得到完整全局事务 
com.aliyun.polardbx.binlog.transmit 包含了数据传输功能,排序合并后的数据通过传输模块发送到下游Dumper组件 
com.aliyun.polardbx.binlog.extractor 包含了一级排序功能,对每个DN的原始物理Binlog进行解析,并按照TSO进行排序,辅以数据整形、元数据历史版本维护、ddl处理等功能 


 [模块]polardbx-cdc-transfer

一个内置的转账程序,通过运行该转账程序,可以对CDC全局Binlog的正确性进行基本的验证,使用方法后文会有详述。


三、库表说明


CDC的系统元数据表保存在GMS元数据库,下面对元数据表进行介绍。


 binlog_system_config。


系统参数信息表,保存系统级的参数配置。  


 binlog_task_config


系统运行时拓扑配置表,保存dumper和task的配置信息,如指定在哪个节点执行、Rpc端口值、运行内存等。


 binlog_node_info


节点信息表,保存了每个节点的资源配置和运行时状态等信息,每个Node对应于表中的一条记录。


 binlog_dumper_info


dumper运行状态信息表,每个Dumper进程对应表中一条记录。


 binlog_task_info


task运行状态信息表,每个Task进程对应表中一条记录。


 binlog_logic_meta_history


逻辑库表元数据信息历史记录表,用于记录每条逻辑DDL SQL及其对应的库表拓扑信息。


 binlog_phy_ddl_history


物理库表元数据信息里是记录表,用于记录每条物理DDL SQL。


 binlog_oss_record


binlog文件信息表,每个逻辑Binlog文件对应于表中的一条记录。


 binlog_polarx_command


命令信息表,CDC和CN之间部分交互通过该表来完成,如触发全量元数据的初始化等。


 binlog_schedule_history


调度历史记录表,集群每发生一次重新调度(如:节点宕机触发Rebalance),会在该表中记录一条历史信息。


   binlog_storage_history


DN节点历史记录表,PolarDB-X进行水平扩容或者缩容时,DN数量会发生变更,每次变更都会通过“打标”的方式通知给CDC,CDC会通过该表按照时间线记录所有的变更历史。Dumper进程每次启动时会查询逻辑Binlog文件中记录的最大tso,发送给Task进程,Task会通过该tso从binlog_storage_history表中查询到当前应该连接的DN列表。


 binlog_env_config_history


参数变更历史表,CDC中有部分参数需要和时间线绑定,该表用来记录这些参数的变更历史,和binlog_storage_history类似,系统会以tso为基准,使用对应时间段的参数配置。


 binlog_schema_history


Flyway执行历史记录表。


四、开发指引


下面来介绍一下,如何基于源码搭建开发环境,方便进行二次开发或代码调试。


 JDK最低要求1.8。


 准备一个PolarDB-X实例(启动CN+DN即可,不要启动CDC组件),可通过pxd方式部署,或通过源码编译安装


 准备polardbx-sql源码,并执行命令mvn install -D maven.test.skip=true -D env=release ,主要是为了得到polardbx-parser包,cdc引用了该模块。


准备polardbx-cdc源码,并调整dev.properties中的配置,根据实际情况调整,可能需要调整的配置有。


polardbx.instance.id
mem_size
metaDb_url
metaDb_username
metaDbPasswd
polarx_url
polarx_username
polarx_password
dnPasswordKey


1)polardbx-cdc根目录下执行mvn compile-D maven.test.skip=true -D env=dev,编译CDC源码。


2)启动Daemon进程,直接运行com.aliyun.polardbx.binlog.daemon.DaemonBootStrap即可。


3)启动Task进程,运行com.aliyun.polardbx.binlog.TaskBootStrap,并指定入参“taskName=Final”。


4)启动Dumper进程,运行

com.aliyun.polardbx.binlog.dumper.DumperBootStrap,并指定入参

“taskName=Dumper-1”。


5)PolarDB-X Server,执行一些Sql,观察{HOME}/binlog目录下是否会产生Binlog数据。


create database transfer_test;
CREATE TABLE `transfer_test`.`accounts` (
    `id` int(11) NOT NULL,
    `balance` int(11) NOT NULL,
  `gmt_created` datetime not null,
    PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8  dbpartition by hash(`id`) tbpartition by hash(`id`) tbpartitions 2;
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (1,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (2,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (3,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (4,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (5,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (6,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (7,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (8,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (9,100,now());
INSERT INTO `transfer_test`.`accounts` (`id`,`balance`,`gmt_created`) VALUES (10,100,now());


1)Docker安装一个MySQL,建议安装8.0版本,提供一个安装示例如下:


docker run -itd --name mysql_3309 -p 3309:3306 -e MYSQL_ROOT_PASSWORD=root mysql
登录dokcer实例:docker exec -it mysql_3309  bash
编辑/etc/mysql/my.cnf,
  a. 增加如下配置来关闭Gtid (polardbx-cdc全局Binlog暂不支持Gtid)
   gtid_mode=OFF
   enforce_gtid_consistency=OFF
  b. 更改server id,避免与主库重复
   server_id = 2
重启docker实例:docker restart mysql_3309


2)MySQL客户端登录新安装的MySQL,并执行如下命令,观察accounts表等信息是否已经同步到MySQL。


stop slave;
reset slave;
CHANGE MASTER TO
    MASTER_HOST='xxx',
    MASTER_USER='xxx',
    MASTER_PASSWORD='xxx',
    MASTER_PORT=xxx,
    MASTER_LOG_FILE='binlog.000001',
    MASTER_LOG_POS=4,
    MASTER_CONNECT_RETRY=100;
start slave;


3)还可以运行tranfer工程下的转账程序,进行转账场景下的测试,启动入口类:com.aliyun.polardbx.binlog。


transfer.Main,开启TSO和不开启TSO会看到不一样的现象,前者会保证强一致(即任意时刻从下游MySQL都能查询到一致的余额),后者只保证最终一致。停止测试程序之后,可以用下面的SQL,验证两边的数据是否完全一致SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, balance, CONCAT(ISNULL(id), ISNULL(balance))))AS UNSIGNED)) AS checksum FROM accounts


五、核心代码


下面给出CDC Stream链路中的一些核心类,这些构成了CDC运行时的骨架,提纲挈领方便快速上手。


 负责对物理Binlog进行处理的核心组件


com.aliyun.polardbx.binlog.extractor.BinlogExtractor com.aliyun.polardbx.binlog.extractor.filter.RtRecordFilter com.aliyun.polardbx.binlog.extractor.filter.TransactionBufferEventFilter com.aliyun.polardbx.binlog.extractor.filter.RebuildEventLogFilter com.aliyun.polardbx.binlog.extractor.filter.MinTSOFilter com.aliyun.polardbx.binlog.extractor.DefaultOutputMergeSourceHandler


 负责对物理Binlog进行局部排序的核心组件


com.aliyun.polardbx.binlog.extractor.sort.Sorter


 负责进行全局排序&归并的核心组件


com.aliyun.polardbx.binlog.merge.LogEventMerger


 负责进行事务合并的核心组件


com.aliyun.polardbx.binlog.collect.handle.TxnShuffleStageHandler com.aliyun.polardbx.binlog.collect.handle.TxnSinkStageHandler  


 负责进行数据传输的核心组件


com.aliyun.polardbx.binlog.transmit.LogEventTransmitter


 负责进行数据存储的核心组件


com.aliyun.polardbx.binlog.storage.LogEventStorage


 负责对逻辑binlog文件进行数据写入的核心组件


com.aliyun.polardbx.binlog.dumper.dump.logfile.LogFileGenerator


 负责进行Dumper主备复制的核心组件


com.aliyun.polardbx.binlog.dumper.dump.logfile.LogFileCopier


 负责进行对整体运行时进行全局调度的核心组件


com.aliyun.polardbx.binlog.daemon.schedule.TopologyWatcher


六、总结


本文主要介绍了polardbx-cdc的代码工程结构,列出了元数据表清单,展示了本地开发调试环境的搭建流程,并在最后给出了最为核心的一些功能组件。希望读者能够基于本文的介绍,并结合附录中给出的一些资料,通过实际动手,快速完成对polardbx-cdc的代码入门。后续我们会推出一系列文章,聚焦于每个点,对源码进行更为精细的解读。


附录

附1 PolarDB-X全局Binlog解读

附2 PolarDB-X全局Binlog解读之DDL

附3 PolarDB-X CN代码结构

附4 PolarDB-X CN启动流程 



相关实践学习
快速体验PolarDB开源数据库
本实验环境已内置PostgreSQL数据库以及PolarDB开源数据库:PolarDB PostgreSQL版和PolarDB分布式版,支持一键拉起使用,方便各位开发者学习使用。
相关文章
|
1月前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
131 6
|
1月前
|
关系型数据库 MySQL 分布式数据库
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶,邀请好友完成更有机会获得​小米Watch S3、小米体重称​等诸多好礼!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
|
20天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
27天前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
102 4
|
1月前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
60 3
|
20天前
|
关系型数据库 分布式数据库 数据库
锦鲤附体 | PolarDB数据库创新设计赛,好礼不停!
锦鲤附体 | PolarDB数据库创新设计赛,好礼不停!
|
1月前
|
SQL JSON 关系型数据库
MySQL是一个广泛使用的开源关系型数据库管理系统,它有许多不同的版本
【10月更文挑战第3天】MySQL是一个广泛使用的开源关系型数据库管理系统,它有许多不同的版本
147 5
|
1月前
|
SQL 关系型数据库 MySQL
创建SQL数据库的基本步骤与代码指南
在信息时代,数据管理显得尤为重要,其中数据库系统已成为信息技术架构的关键部分。而当我们谈论数据库系统时,SQL(结构化查询语言)无疑是其中最核心的工具之一。本文将详细介绍如何使用SQL创建数据库,包括编写相应的代码和必要的步骤。由于篇幅限制,本文可能无法达到您要求的2000字长度,但会尽量涵盖创建数
51 3
|
1月前
|
关系型数据库 分布式数据库 数据库
PolarDB 开源:推动数据库技术新变革
在数字化时代,数据成为核心资产,数据库的性能和可靠性至关重要。阿里云的PolarDB作为新一代云原生数据库,凭借卓越性能和创新技术脱颖而出。其开源不仅让开发者深入了解内部架构,还促进了数据库生态共建,提升了稳定性与可靠性。PolarDB采用云原生架构,支持快速弹性扩展和高并发访问,具备强大的事务处理能力及数据一致性保证,并且与多种应用无缝兼容。开源PolarDB为国内数据库产业注入新活力,打破国外垄断,推动国产数据库崛起,降低企业成本与风险。未来,PolarDB将在生态建设中持续壮大,助力企业数字化转型。
91 2
|
13天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
28 1

相关产品

  • 云原生数据库 PolarDB
  • 下一篇
    无影云桌面