开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(八):GlobalBinlog的一生

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(八):GlobalBinlog的一生

本篇将对Binlog的产生,以及如何通过系统处理并最终生成Global Binlog的过程进行分析。  


一、概述


Global Binlog的一生是指从原始Binlog产生,到最终Global Binlog生成期间产生的故事,本文会详细介绍Binlog拉取,数据整形合并,以及最终生成Global Binlog期间经过的关键流程。Global Binlog涉及到Task和Dumper组件,分别针对Binlog整形合并以及落盘过程,下面会介绍这两个组件对binlog处理的相关代码。


                             


二、Task组件


当用户向PolarDB-X写入数据时,最终数据会落盘到底层DN上,DN会产生原始binlog,task组件会拉取到原始binlog,整形合并成Global Binlog后,发送给下游Dumper组件,下面我们通过查看Task的核心代码来了解一下整个原始Binlog的处理流程。


                               



我们可以以代码com.aliyun.polardbx.binlog.canal.core.BinlogEventProcessor(github地址:https://github.com/polardb/polardbx-cdc)为入口来查看整个binlog的关键入口class,查看binlog是如何拉取并投递到下游handler中。


         


在handler中,会通过一些列的filter,对binlog进行处理。在com.aliyun.polardbx.binlog.canal.core.handle.DefaultBinlogEventHandle代码中,我们可以看到如下逻辑


         


通过head对象的doNext方法,逐级遍历所有filter,最终对binlog event进行过滤和格式化处理,并输出逻辑event到下游。head对象为filter链表,初始化逻辑见下面代码。


         


Extractor中的filter chain如图:

         


在BinlogExtractor初始化时,会将一些列Filter加入到列表中,binlog event会按顺序逐个经过这些filter。


 com.aliyun.polardbx.binlog.extractor.filter.RtRecordFilter:会记录当前event下游逻辑处理的rt。


 com.aliyun.polardbx.binlog.extractor.filter.TransactionBufferEventFilter:处理事务相关event,并标记出tso事件。


     


当收到commit event时,会尝试执行tryDoNext。


      、、


事务的推送算法可以在

com.aliyun.polardbx.binlog.extractor.filter.TransactionStorage类中查看。com.aliyun.polardbx.binlog.extractor.filter.RebuildEventLogFilter负责binlog event过滤和整形流程,具体逻辑可以查看handle方法。整形的关键代码可以查看reformat方法。


         


Global binlog中,我们只关心QueryEvent、RowEvent(Insert/Update/Delete)和TableMapEvent,所以这里我们只处理了这些event的数据。


com.aliyun.polardbx.binlog.extractor.filter.MinTSOFilter会对事务进行初步过滤。数据最终会通过

com.aliyun.polardbx.binlog.extractor.DefaultOutputMergeSourceHandler推送给下游事务合并代码com.aliyun.polardbx.binlog.merge.MergeSource中,sourceId唯一标记了当前数据流id,queue保存事务索引txnKey。


       


com.aliyun.polardbx.binlog.merge.LogEventMerger中,会将所有DN对应的MergeSource的queue遍历提取出来。


       


com.aliyun.polardbx.binlog.merge.MergeController会保存对应的sourceId和数据,确保每个sourceId只会收到一条数据,下游在拉取数据时,会将保存在优先队列的数据pop出来,push到com.aliyun.polardbx.binlog.collect.LogEventCollector中,最终通过ringBuffer提供给com.aliyun.polardbx.binlog.transmit.LogEventTransmitter来投递给下游Dumper组件。


三、Dumper组件


当Global Binlog系统启动时,优先启动Task组件,监听端口。Dumper组件会尝试连接Task。Dumper组件收到Task推送的Global Binlog后,会对binlog进行最后的细节处理,并且把处理好的结果写入磁盘。下面我们从Dumper组件消费Task推送数据的入口来分析,整个binlog处理核心流程。在com.aliyun.polardbx.binlog.dumper.dump.logfile.LogFileGenerator类的start方法中,该方法会启动grpc连接Task端口。


         


代码里实现了com.aliyun.polardbx.binlog.rpc.TxnMessageReceiver接口,该接口会消费上游推送过来的所有数据,并在consume方法中,处理相关Global Binlog position等相关信息,并最终通过com.aliyun.polardbx.binlog.dumper.dump.logfile.BinlogFile写入到磁盘中。


private void consume(TxnMessage message, MessageType processType) throws IOException, InterruptedException {
            ...
        switch (processType) {
        case BEGIN:
             ...
            break;
        case DATA:
             ...
            break;
        case END:
             ...
            break;
        case TAG:
            currentToken = message.getTxnTag().getTxnMergedToken();
            if (currentToken.getType() == TxnType.META_DDL) {
                ...
            } else if (currentToken.getType() == TxnType.META_DDL_PRIVATE) {
                ...
            } else if (currentToken.getType() == TxnType.META_SCALE) {
                ...
            } else if (currentToken.getType() == TxnType.META_HEARTBEAT) {
                ...
            } else if (currentToken.getType() == TxnType.META_CONFIG_ENV_CHANGE) {
                ...
            }
            break;
        default:
            throw new PolardbxException("invalid message type for logfile generator: " + processType);
        }
    }


在consome方法中会逐个处理事务和数据相关的事件,如果事件打标了系统之间交互的tag,会针对相应的tag做一定的逻辑处理。


四、小结


本文对binlog的拉取、整形处理和最终落盘涉及到的关键流程进行了简单梳理,Global binlog的一生是从原始物理binlog到逻辑binlog的转变,代码中的原理可以参考全局Binlog解读之理论篇(https://zhuanlan.zhihu.com/p/462995079)。



相关实践学习
体验PolarDB-X透明分布式
PolarDB-X具备从单机到分布式的平滑演进能力,支持通过DDL将一张大表动态调整为分布式的分区表,结合分布式事务、以及兼容MySQL Binlog的数据回流,可完成单机到分布式的快速改造。本体验从PolarDB-X的Auto Partition,以及兼容MySQL Binlog的CDC能力两个方面演示透明分布式能力。
相关文章
|
2月前
|
存储 监控 分布式数据库
ClickHouse分布式数据库动态伸缩(弹性扩缩容)的实现
实现ClickHouse数据库的动态伸缩需要持续的维护和精细的操作。从集群配置到数据迁移,再到监控和自动化,每一步都要仔细管理以确保服务的可靠性和性能。这些活动可以显著提高应用的响应性和成本效率,帮助业务根据实际需求灵活调整资源分配。
138 10
|
3月前
|
存储 关系型数据库 分布式数据库
【赵渝强老师】基于PostgreSQL的分布式数据库:Citus
Citus 是基于 PostgreSQL 的开源分布式数据库,采用 shared nothing 架构,具备良好的扩展性。它以插件形式集成,部署简单,适用于处理大规模数据和高并发场景。本文介绍了 Citus 的基础概念、安装配置步骤及其在单机环境下的集群搭建方法。
187 2
|
4月前
|
存储 Cloud Native 关系型数据库
PolarDB开源:云原生数据库的架构革命
本文围绕开源核心价值、社区运营实践和技术演进路线展开。首先解读存算分离架构的三大突破,包括基于RDMA的分布式存储、计算节点扩展及存储池扩容机制,并强调与MySQL的高兼容性。其次分享阿里巴巴开源治理模式,涵盖技术决策、版本发布和贡献者成长体系,同时展示企业应用案例。最后展望技术路线图,如3.0版本的多写多读架构、智能调优引擎等特性,以及开发者生态建设举措,推荐使用PolarDB-Operator实现高效部署。
234 3
|
4月前
|
SQL 关系型数据库 分布式数据库
PolarDB开源数据库入门教程
PolarDB是阿里云推出的云原生数据库,基于PostgreSQL、MySQL和Oracle引擎构建,具备高性能、高扩展性和高可用性。其开源版采用计算与存储分离架构,支持快速弹性扩展和100%兼容PostgreSQL/MySQL。本文介绍了PolarDB的安装方法(Docker部署或源码编译)、基本使用(连接数据库、创建表等)及高级特性(计算节点扩展、存储自动扩容、并行查询等)。同时提供了性能优化建议和监控维护方法,帮助用户在生产环境中高效使用PolarDB。
1394 21
|
4月前
|
Cloud Native 关系型数据库 分布式数据库
PolarDB开源:云原生数据库的新篇章
阿里云自研的云原生数据库PolarDB于2023年5月正式开源,采用“存储计算分离”架构,具备高性能、高可用及全面兼容性。其开源版本提供企业级数据库解决方案,支持MySQL、PostgreSQL和Oracle语法,适用于高并发OLTP、核心业务系统等场景。PolarDB通过开放治理与开发者工具构建完整生态,并展望更丰富的插件功能与AI集成,为中国云原生数据库技术发展贡献重要力量。
417 17
|
4月前
|
存储 关系型数据库 分布式数据库
PolarDB开源进阶篇:深度解析与实战优化指南
PolarDB是阿里云开源的云原生数据库,采用计算-存储分离架构,结合高性能共享存储与Parallel Raft多副本一致性协议,实现微秒级延迟和卓越性能。本文深入解析其架构设计,涵盖智能调度层、性能优化技巧(如查询优化器调优和分布式事务提升)、高可用与容灾配置、扩展功能开发指南以及监控运维体系。同时,通过电商平台优化案例展示实际应用效果,并展望未来演进方向,包括AI结合、多模数据库支持及Serverless架构发展。作为云原生数据库代表,PolarDB为开发者提供了强大支持和广阔前景。
253 16
|
5月前
|
SQL 存储 分布式数据库
分布式存储数据恢复—hbase和hive数据库数据恢复案例
分布式存储数据恢复环境: 16台某品牌R730xd服务器节点,每台服务器节点上有数台虚拟机。 虚拟机上部署Hbase和Hive数据库。 分布式存储故障: 数据库底层文件被误删除,数据库不能使用。要求恢复hbase和hive数据库。
182 12
|
7月前
|
关系型数据库 分布式数据库 数据库
喜报|PolarDB开源社区荣获“2024数据库国内活跃开源项目”奖
喜报|PolarDB开源社区荣获“2024数据库国内活跃开源项目”奖
114 1
|
7月前
|
人工智能 监控 开发者
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
117 0

相关产品

  • 云原生数据库 PolarDB