探究 Canal EventParser 的设计与实现奥妙

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
全局流量管理 GTM,标准版 1个月
简介: 探究 Canal EventParser 的设计与实现奥妙

本文将从三个方面深度剖析 EventParser 组件。


  • 从官方文档看 EventParser 的设计思想
  • 从 EventParser 初始化了解内部的是可配置项
  • 从 EventParser 的启动窥探其工作实现原理

温馨提示:本篇篇幅较长,如果耐心阅读一定会有不错的收获,为了提高阅读体验,本文所有源码都是通过截图方式,大家可以重点阅读对应的文字说明,并在文末进行了总结。

1、官方文档看 EventParser


首先我们先从官方文档来看 EventParser 的整体设计,其架构设计图如下所示:

bf3cc369bc7c3e480b0693bbcfee62ce.jpg

上述图罗列出了 EventParser 的整体工作流程图,其关键步骤如下:


  1. 从 Log Position 管理器中获取上一次解析的日志位点。
  2. 向 Mysql Master 节点发送 BINLOG_DUMP 请求。
  3. Mysql Master 节点从 Slave 端传入的日志位点开始向从节点推送 binlog 日志。
  4. Slave 接收 binlog 日志,调用 BinlogParser 解析 binlog日志。
  5. 将解析后的结构化数据传入到 EventSink 组件。
  6. 定时记录解析 binlog 的日志,以便重启后继续进行增量订阅。
  7. 上图中还罗列一个HA 特性,即需要同步的 Master 如果宕机,可以从它的其他从节点继续同步 binlog 日志,避免单点故障。


官方文档有助于理解 EventParser 组件的实现原理,但关于如何使用 EventParser 的篇幅较少,故接下来将从源码的角度来反推 EventParser 的特性以及详细的工作实现原理,以便指导我们如何更好的使用 EventParser。


2、源码剖析 EventParser 初始化


从上篇文章我们即可得知,EventParser 组件是 Canal Instance 的四大核心组件之一,那本节的故事就从 CanalInstanceWithManager 的 initEventParser 方法开始。

45e6150e63267b9a175f3761b4a32aa8.png


Step1:获取数据库的连接信息,上面的代码就是集合的基本操作,但从上面的代码可以窥探如何配置数据库相关的地址信息。配置 canal instance 中 数据库的地址,用户名密码有如下几种方式(CanalParamter):


  • 单库场景配置方式一:CanalParmeter 中提供了 masterAddress、masterUsername、masterPassword、standbyAddress、standbyUsername、standbyPassword 6 个属性分别用来指定主库与从库的信息,配置了从库的目的是提供 HA 机制。
  • 单库场景配置方式二:CanalParamter 提供的 List
    dbAddresses 方式进行配置,该集合的第一个元素为主库地址、第二个元素为从库地址,其数据库用户名通过 dbUsername、dbPassword 来配置。
  • 多库场景:CanalParmeter 提供了 List< List< DataSourcing>> groupDbAddresses 属性用来设置 mysql 组,例如 MySQL 分库分表。groupDbAddresses 的第一个元素为主库的地址列表,第二个元素为从库的地址列表。


温馨提示:这里的用户名与密码是在对应服务器用于进行 binlog 日志同步的账号信息。


关于多库场景的配置,再详细举例如下:


cce30da335daacd0836f74596c9d5ace.png

其对应的初始化代码如下:

09895031e16a4dfb344c635848ac253c.png

CanalInstanceWithManager#initEventParser

e80432a598b8b388cbd5218f095173c6.png

Step2:根据配置的 MySQL 构建 EventParser 实例。这里有如下几个关键点:


  • 如果配置的 MySQL 地址是组方式则会创建 GroupEventParser,其内部会维护一个 EventParser 列表。
  • 通过调用 doInitEventParser 方法创建 EventParser 实例。


接下来我们将重点查看 doInitEventParser 的实现细节。


dd95763ae8f11a2bfc5aced28eb1c9a2.png

Step3:从这里可以看出 Canal 目前并不支持 Oracle 数据,只支持 MySQL 与 本地 binlog 文件(直接根据 binlog 日志文件解析)。


温馨提示:接下来将重点探讨基于 MySQL binlog 日志,并且会忽略与阿里云相关的 RDS 、tsdb 等数据库辅助支持,只关系与开源 MySQL 相关的处理逻辑。

a480fa67b92de3aff5481c4091b57070.png

Step3:MySQL 的 binlog 事件解析器实现类为 MysqlEventParser,这里我们重点来阐述一下这些参数的含义:


  • destination
    Canal Instance 实例的名称。
  • connectionCharset
    字符集,解析 binlog 时会将指定的字节数据使用该编码级进行转换,默认为UTF-8。
  • connectionCharsetNumber
    字符集的数字表现形式,UTF8对应的值为 33,该值在与 MySQL 的交互协议包中需要被用到,这里 Canal 处理的不是特别好,最好该属性设置为只读,由 connectionCharset 联动进行设置。
  • defaultConnectionTimeoutInSeconds
    MySQL 默认连接超时时间,因为 Canal 会伪装为 MySQL 服务器的 Slave 节点,需要向 MySQL Master 发送请求,故需要先创建链接,这里就是创建连接的默认超时时间,默认为 30s。
  • sendBufferSize
    用于网络通道发送端缓存区,目前在 Canal 中网络通道的实现类为 BioSocketChannelPool、NettySocketChannelPool,从代码的角度来看,目前这个参数并不会生效,即使用操作系统的默认值。
  • receiveBufferSize
    用于忘了通道接收缓存区大小,目前同 sendBufferSize 参数,并不会生效。
  • detectingEnable
    是否开启心跳检测,默认为开启。
  • detectingSQL
    心跳检测语句,例如 select 1,show master status 等。
  • detectingIntervalInSeconds
    心跳间隔检测,默认为 3s。
  • slaveId
    从服务器的 id,在同一个 MySQL 复制组内不能重复。

cc9279c73394c8cbd6a52c2a0dbc00d9.png

Step4:如果设置了 CanalPrameter 的 Listpositions 属性,则将其解析为 EntryPosition 实体,我们来看一下如何表征 binlog 日志的位点信息。

885b3469eb80909646ead7fbdfa5a65f.png

其主要的核心参数如下:


  • long timestamp
    时间戳,用时间戳来表示位置
  • String journalName
    binlog 日志的文件名,例如 mysql-bin.000001。
  • Long position
    使用偏移量来表示具体位点。
  • long serverId
    设置 master 的 id。
  • String gtid
    全局事务ID。

温馨提示:实践指导,CanalParameter 的 List< String> positions 不支持组模式,只能设置一组,即第一个元素为主,第二个元素可以为从节点,该属性非必填。

f1327668f8e9ad0c5d0da33562adb5a8.png

Step5:继续设置参数,具体看一下各个参数的含义:


  • fallbackIntervalInSeconds
    如果 MySQL 主节点宕机,Canal 支持切换到其从节点继续同步 binlog 日志,但为了数据的完整性,可以设置一个回退时间,即会造成数据重复下发,但尽量不丢失,该值默认为 60s。
  • profilingEnabled
    是否开启性能采集,主要采集的是一批日志经过 EventSink 组件处理到完成 存入EventStore 的时间消耗。
  • filterTableError
    是否忽略表过滤异常,默认为 false,表过滤会在后续文章中详细介绍。
  • parallel
    解析、canal 接入 prometheus 采集监控数据是否支持并发,默认为 false。
  • isGTIDMode
    是否开启 gtid 模式

1bc333c74e5c60b3abb1ec349831b5e4.png

Step6:继续填充解析器相关参数,其重点实现如下:


  • transactionSize
    Canal 提供了一种机制,尝试将一个数据库事务中所有的变更日志一起进行处理,这个为处理缓存事务日志的缓存区长度,默认为 1024。
  • logPositionManager
    初始化日志位点管理器,Canal 提供了基于内存、zookeeper、内存与zookeepr混合管理器等日志位点管理器,这个后续会详细介绍。
  • AviaterRegexFilter
    提供了基于 aviater 的正则表达式,对 table 名称进行过滤。
  • blackFilter
    canal 提供了黑名单配置,提供黑名单正则表达式对 table 名称进行过滤。

f6d6c8037bfa78977cae1faf96bf79b2.png

Step7:如果解析器是 MySQL 解析器,提供了 HA 机制,即如果 MySQL Master 宕机,Canal 还能主动切换到 MYSQL Slave 节点,继续同步 binlog 日志。

3、EventParser 工作流程详解


上面已经详细介绍了EventParser 的初始化过程,有助于大家对 CanalInstance 相关配置参数的理解,本节将相信介绍 EventParser 的工作流程,其实现代码入口为 EventParser 的 start 方法。本文重点将探究 MySQL binlog 日志的解析,故其实现类为:MysqlEventParser。


MysqlEventParser 的 start 方法代码如下:

56cfee061687a4608262474b2f79a8f8.png

主要调用的是其父类的 start 方法。接下来对其进行详细解读。

c64b0087fa2a9d5db51a4fedfb458a01.png

Step1:创建环形缓存区,其主要的作用是 Canal 在解析 binlog 日志后,会尽量尝试将一个数据库事务所产生的全部变更日志(一个事务所有变更数据)当成一个整体提交给 EventSink 组件,从而 Canal 的消费方能一次将一个事务的数据全部同步,数据的完整性得到了保证。


温馨提示:关于环形缓存区的具体实现细节将在下文详细介绍,这里先简单说一下 Canal 目前无法百分之百保证一个事务的数据就一定是一次消费,如果一个事务产生的变更日志超过了环形缓存区的容量,则会被强制提交消费,一个事务的数据会被分开消费,默认环形缓存区的长度为 1024.

77e3bbe96b617c53ace821f17f532033.png

Step2:构建一个 binlog 解析器,该方法在 AbstractEventParser 中为一个抽象方法,具体的实现在其子类中,其代码截图如下:


AbstractMysqlEventParser,在 MySQL binlog 解析的实现类为 LogEventConvert,所处的模块为 parse,该部分是整个 Canal EventParser 的核心,将在后续文章中单独详细介绍。

821b440ebbfe40a91cd3a64d84a67909.png

2a9a00733e2ecf26c9afd6116d262cf6.png

Step3:启动一个独立的线程来负责 binlog 的解析,其线程包含了 Canal Instance 的 destination、address 等信息,方便利用 jstack 去诊断 binlog 解析相关问题。接下来就是解读该线程的 run 方法,从而探究 binlog 的解析流程。

19e691d619fa069d61661484f3eb35b7.png

Step31:首先创建一条到需要解析 binlog 日志的服务器,例如需要同步 192.168.1.166:3306 这个数据库实例的 binlog 日志,那 Canal 首先会使用拥有该库复制权限的账号去创建一条TCP连接,本文并不会详细去介绍这里的实现细节,这里代表一个领域,即需要知晓 MySQL 通讯协议,通过TCP与MySQL建立连接,并按照 MySQL 通讯协议发送命令,例如 select、dump 等请求,这个后续在学完 Canal 等核心组件后,可能会深入学习该部分的内容,这里我重点点出其实现的几个关键要点:


  • 首先创建一条TCP连接,连接到 MySQL 服务器,Canal 提供了 BIO 与 Netty 两种实现方式。
  • TCP 三次握手后成功建立TCP连接后,需要与 MySQL  进行握手,完成协议约定,客户端登录校验等,例如握手实现代码见:MysqlConnector negotiate。
  • 一言以蔽之,MySqlConnection 的职责就是实现一个 MySQL 客户端。其效果等同于实现我们常用的 SQL 连接客户端,关于这方面的编程其实不难,如果大家有志成为一名数据库中间件方面的技术人员,只需按照 MySQL 官方文档中有关通讯协议即可。

01e0fb549bc17b1e5235910d4ae71dcc.png

Step32:发送心跳包,这里的关键实现点如下:


  • 利用 Timer 实现定时调度,心跳包发送间隔通过 detectingIntervalInSeconds 指定。
  • 心跳包主要是构建一个 CanalEntry.Entry,其类型为EntryType.HEARTBEAT。
    心跳包并不是发送给远端 MySQL 服务器,而是将 Entry 下发到 EventSink 组件。
  • 该心跳包的用意合作,在这里先留一个伏笔,后续文章会一一揭晓。

22a6c80237bf7320d395a6dc77dd4d75.png

Step33:执行发送 dump 命令正式从 MySQL 服务器接收 binlog 日志之前的准备工作,具体准备工作如下:


  • 首先再创建一条专属数据库连接,主要用于查找 MySQL 的一些配置信息,统称元数据。
  • 向 MySQL 服务器发送 show variables like 'binlog_format‘ 语句查询服务端配置的 binlog 格式,MySQL 支持 STATEMENT、ROW、MIXED 三种模式。
  • 向 MySQL 服务端发送 show variables like 'binlog_row_image' 语句查询服务器端配置的 binlog_row_image。


扩展阅读:binlog_format 我相信大家都不陌生,对 binlog_row_image 见过的估计比较少,那 binlog_row_image 有何作用呢?


binlog_row_image 主要是在 binlog_format 为 ROW 模式下,控制记录 binlog 事件的方式,binlog 的作用是记录数据的变化,例如 update 请求,需要记录一行记录变化之前的数据以及变化后的数据,在 binlog event 分别用 before 、after 记录变化前后的数据,但有一个问题,是只发生变化的字段的前后值呢,还是记录一行中所有字段修改前后的值呢?故引入了 binlog_row_image,该值支持如下选项:


  • full:在 before 与 after 中记录所有字段的值,针对每一个字段,使用 update 来表示该字段是否发生变化,该选项为默认值。
  • minimal:在 before 与 after 中只记录发生变化的字段,并且包含能够唯一识一行数据的值,例如主键。
  • noblob:在 before 与 after 中记录所有的列值,但 BLOB 与 TEXT 类型的字段列除外(如未更改)。

d7194b3265184c9217b8a57f2896cee7.png

Step34:向 MySQL 服务端发送 show variables like 'server_id' 语句,查询服务端配置的 serverId。

5dc3ea7b551ed278b9c89e0481db3ad6.png

Step35:通过日志位点管理器获取需要同步的位点,后续会详细展开。

0d8927e0c50a647d925a5fcb8f04c616.png

Step36:通过向 MySQL 发送 dump 请求,从服务器接收 binlog 日志,并进行处理,为了提高性能,Canal 支持该过程进行并行化处理,通过 parallel 属性设置是否支持并发,从而引入 disruptor 高性能并发框架,详情后在后续文章中详细解读。

6671c1703ed7af64f5cecb862e0b2bd2.png

Step37:通过接收到 MySQL 服务端返回的日志并解析为 Canal.Entry 对象,并传输到 EventSink 组件。


上述过程反复执行,持续完成 binlog 日志的解析,实现数据的同步。


4、总结


本文首先结合官方文档了解了 EventParser,但 Canal 的官方手册并不特别详细,故需要我们通过源码去反推 canal instance 中关于 EventParser 有哪些参数,并且这些参数有何意义,是如何工作的。

众所周知,EventParser 的主要职责就是与 MySQL 服务器“打交道”,将自己伪装成 MySQL 服务器的一个从节点,从服务器端接收 binlog 日志,并将二进制流解码成 Canal.Entry,看似简单,但实现起来还是比较困难的,下面这些方面是后续值得我们研究探讨的点:


  1. 环形缓存区的使用与技巧。
  2. 实现 MySQL 通讯协议,向 MySQL 发送相关SQL语句并解析返回结果\,具体由 MysqlConnection 对象实现。
  3. 日志解析位点管理机制。
  4. 基于GTID、日志位点偏移量两种方式定位 binlog 日志方式。
  5. dump 命令的发送、高性能设计( disruptor 框架的引入)
相关文章
|
监控 负载均衡 应用服务中间件
Passenger作用及原理梳理
我们在部署rails应用时,大多时候都使用Nginx+Passenger的方式部署,本文主要对此架构下 Passenger的作用及其工作原理进行梳理。 一、什么是Passenger? Phusion Passenger是一个开源的Web应用服务器,它能够处理HTTP请求,管理进程和资源、 系统监控以
682 0
|
6月前
|
消息中间件 Kafka 数据处理
超硬核!详解Apache Hudi灵活的Payload机制
超硬核!详解Apache Hudi灵活的Payload机制
277 3
|
消息中间件 Kafka 数据处理
超硬核解析!Apache Hudi灵活的Payload机制
Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性。Hudi Payload在写入和读取Hudi表时对数据进行去重、过滤、合并等操作的工具类,通过使用参数 &quot;hoodie.datasource.write.payload.class&quot;指定我们需要使用的Payload class。 ​
2183 0
超硬核解析!Apache Hudi灵活的Payload机制
|
存储 安全 Java
「望仔细品读!」☕【Java原理探索】史上最清晰的探究和分析【Safe Point+Safe Region】(上)
「望仔细品读!」☕【Java原理探索】史上最清晰的探究和分析【Safe Point+Safe Region】(上)
155 0
「望仔细品读!」☕【Java原理探索】史上最清晰的探究和分析【Safe Point+Safe Region】(上)
|
消息中间件 分布式计算 网络协议
SparkStreaming 案例_解释说明 | 学习笔记
快速学习 SparkStreaming 案例_解释说明
SparkStreaming 案例_解释说明 | 学习笔记
|
分布式计算 大数据 API
SparkStreaming 原理_问题提出 | 学习笔记
快速学习 SparkStreaming 原理_问题提出
SparkStreaming 原理_问题提出 | 学习笔记
|
分布式计算 Java Linux
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
191 0
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
|
canal 关系型数据库 MySQL
「从零单排canal 07」 parser模块源码解析(一)
「从零单排canal 07」 parser模块源码解析(一)
227 0
「从零单排canal 07」 parser模块源码解析(一)
|
canal SQL 存储
「从零单排canal 07」 parser模块源码解析(二)
「从零单排canal 07」 parser模块源码解析(二)
296 0
「从零单排canal 07」 parser模块源码解析(二)
|
消息中间件 SQL 分布式计算
Spark Sreaming实战(二)-小试流式处理
Spark Sreaming实战(二)-小试流式处理
108 0
Spark Sreaming实战(二)-小试流式处理