本文将从三个方面深度剖析 EventParser 组件。
- 从官方文档看 EventParser 的设计思想
- 从 EventParser 初始化了解内部的是可配置项
- 从 EventParser 的启动窥探其工作实现原理
温馨提示:本篇篇幅较长,如果耐心阅读一定会有不错的收获,为了提高阅读体验,本文所有源码都是通过截图方式,大家可以重点阅读对应的文字说明,并在文末进行了总结。
1、官方文档看 EventParser
首先我们先从官方文档来看 EventParser 的整体设计,其架构设计图如下所示:
上述图罗列出了 EventParser 的整体工作流程图,其关键步骤如下:
- 从 Log Position 管理器中获取上一次解析的日志位点。
- 向 Mysql Master 节点发送 BINLOG_DUMP 请求。
- Mysql Master 节点从 Slave 端传入的日志位点开始向从节点推送 binlog 日志。
- Slave 接收 binlog 日志,调用 BinlogParser 解析 binlog日志。
- 将解析后的结构化数据传入到 EventSink 组件。
- 定时记录解析 binlog 的日志,以便重启后继续进行增量订阅。
- 上图中还罗列一个HA 特性,即需要同步的 Master 如果宕机,可以从它的其他从节点继续同步 binlog 日志,避免单点故障。
官方文档有助于理解 EventParser 组件的实现原理,但关于如何使用 EventParser 的篇幅较少,故接下来将从源码的角度来反推 EventParser 的特性以及详细的工作实现原理,以便指导我们如何更好的使用 EventParser。
2、源码剖析 EventParser 初始化
从上篇文章我们即可得知,EventParser 组件是 Canal Instance 的四大核心组件之一,那本节的故事就从 CanalInstanceWithManager 的 initEventParser 方法开始。
Step1:获取数据库的连接信息,上面的代码就是集合的基本操作,但从上面的代码可以窥探如何配置数据库相关的地址信息。配置 canal instance 中 数据库的地址,用户名密码有如下几种方式(CanalParamter):
- 单库场景配置方式一:CanalParmeter 中提供了 masterAddress、masterUsername、masterPassword、standbyAddress、standbyUsername、standbyPassword 6 个属性分别用来指定主库与从库的信息,配置了从库的目的是提供 HA 机制。
- 单库场景配置方式二:CanalParamter 提供的 List
dbAddresses 方式进行配置,该集合的第一个元素为主库地址、第二个元素为从库地址,其数据库用户名通过 dbUsername、dbPassword 来配置。 - 多库场景:CanalParmeter 提供了 List< List< DataSourcing>> groupDbAddresses 属性用来设置 mysql 组,例如 MySQL 分库分表。groupDbAddresses 的第一个元素为主库的地址列表,第二个元素为从库的地址列表。
温馨提示:这里的用户名与密码是在对应服务器用于进行 binlog 日志同步的账号信息。
关于多库场景的配置,再详细举例如下:
其对应的初始化代码如下:
CanalInstanceWithManager#initEventParser
Step2:根据配置的 MySQL 构建 EventParser 实例。这里有如下几个关键点:
- 如果配置的 MySQL 地址是组方式则会创建 GroupEventParser,其内部会维护一个 EventParser 列表。
- 通过调用 doInitEventParser 方法创建 EventParser 实例。
接下来我们将重点查看 doInitEventParser 的实现细节。
Step3:从这里可以看出 Canal 目前并不支持 Oracle 数据,只支持 MySQL 与 本地 binlog 文件(直接根据 binlog 日志文件解析)。
温馨提示:接下来将重点探讨基于 MySQL binlog 日志,并且会忽略与阿里云相关的 RDS 、tsdb 等数据库辅助支持,只关系与开源 MySQL 相关的处理逻辑。
Step3:MySQL 的 binlog 事件解析器实现类为 MysqlEventParser,这里我们重点来阐述一下这些参数的含义:
- destination
Canal Instance 实例的名称。 - connectionCharset
字符集,解析 binlog 时会将指定的字节数据使用该编码级进行转换,默认为UTF-8。 - connectionCharsetNumber
字符集的数字表现形式,UTF8对应的值为 33,该值在与 MySQL 的交互协议包中需要被用到,这里 Canal 处理的不是特别好,最好该属性设置为只读,由 connectionCharset 联动进行设置。 - defaultConnectionTimeoutInSeconds
MySQL 默认连接超时时间,因为 Canal 会伪装为 MySQL 服务器的 Slave 节点,需要向 MySQL Master 发送请求,故需要先创建链接,这里就是创建连接的默认超时时间,默认为 30s。 - sendBufferSize
用于网络通道发送端缓存区,目前在 Canal 中网络通道的实现类为 BioSocketChannelPool、NettySocketChannelPool,从代码的角度来看,目前这个参数并不会生效,即使用操作系统的默认值。 - receiveBufferSize
用于忘了通道接收缓存区大小,目前同 sendBufferSize 参数,并不会生效。 - detectingEnable
是否开启心跳检测,默认为开启。 - detectingSQL
心跳检测语句,例如 select 1,show master status 等。 - detectingIntervalInSeconds
心跳间隔检测,默认为 3s。 - slaveId
从服务器的 id,在同一个 MySQL 复制组内不能重复。
Step4:如果设置了 CanalPrameter 的 Listpositions 属性,则将其解析为 EntryPosition 实体,我们来看一下如何表征 binlog 日志的位点信息。
其主要的核心参数如下:
- long timestamp
时间戳,用时间戳来表示位置 - String journalName
binlog 日志的文件名,例如 mysql-bin.000001。 - Long position
使用偏移量来表示具体位点。 - long serverId
设置 master 的 id。 - String gtid
全局事务ID。
温馨提示:实践指导,CanalParameter 的 List< String> positions 不支持组模式,只能设置一组,即第一个元素为主,第二个元素可以为从节点,该属性非必填。
Step5:继续设置参数,具体看一下各个参数的含义:
- fallbackIntervalInSeconds
如果 MySQL 主节点宕机,Canal 支持切换到其从节点继续同步 binlog 日志,但为了数据的完整性,可以设置一个回退时间,即会造成数据重复下发,但尽量不丢失,该值默认为 60s。 - profilingEnabled
是否开启性能采集,主要采集的是一批日志经过 EventSink 组件处理到完成 存入EventStore 的时间消耗。 - filterTableError
是否忽略表过滤异常,默认为 false,表过滤会在后续文章中详细介绍。 - parallel
解析、canal 接入 prometheus 采集监控数据是否支持并发,默认为 false。 - isGTIDMode
是否开启 gtid 模式
Step6:继续填充解析器相关参数,其重点实现如下:
- transactionSize
Canal 提供了一种机制,尝试将一个数据库事务中所有的变更日志一起进行处理,这个为处理缓存事务日志的缓存区长度,默认为 1024。 - logPositionManager
初始化日志位点管理器,Canal 提供了基于内存、zookeeper、内存与zookeepr混合管理器等日志位点管理器,这个后续会详细介绍。 - AviaterRegexFilter
提供了基于 aviater 的正则表达式,对 table 名称进行过滤。 - blackFilter
canal 提供了黑名单配置,提供黑名单正则表达式对 table 名称进行过滤。
Step7:如果解析器是 MySQL 解析器,提供了 HA 机制,即如果 MySQL Master 宕机,Canal 还能主动切换到 MYSQL Slave 节点,继续同步 binlog 日志。
3、EventParser 工作流程详解
上面已经详细介绍了EventParser 的初始化过程,有助于大家对 CanalInstance 相关配置参数的理解,本节将相信介绍 EventParser 的工作流程,其实现代码入口为 EventParser 的 start 方法。本文重点将探究 MySQL binlog 日志的解析,故其实现类为:MysqlEventParser。
MysqlEventParser 的 start 方法代码如下:
主要调用的是其父类的 start 方法。接下来对其进行详细解读。
Step1:创建环形缓存区,其主要的作用是 Canal 在解析 binlog 日志后,会尽量尝试将一个数据库事务所产生的全部变更日志(一个事务所有变更数据)当成一个整体提交给 EventSink 组件,从而 Canal 的消费方能一次将一个事务的数据全部同步,数据的完整性得到了保证。
温馨提示:关于环形缓存区的具体实现细节将在下文详细介绍,这里先简单说一下 Canal 目前无法百分之百保证一个事务的数据就一定是一次消费,如果一个事务产生的变更日志超过了环形缓存区的容量,则会被强制提交消费,一个事务的数据会被分开消费,默认环形缓存区的长度为 1024.
Step2:构建一个 binlog 解析器,该方法在 AbstractEventParser 中为一个抽象方法,具体的实现在其子类中,其代码截图如下:
AbstractMysqlEventParser,在 MySQL binlog 解析的实现类为 LogEventConvert,所处的模块为 parse,该部分是整个 Canal EventParser 的核心,将在后续文章中单独详细介绍。
Step3:启动一个独立的线程来负责 binlog 的解析,其线程包含了 Canal Instance 的 destination、address 等信息,方便利用 jstack 去诊断 binlog 解析相关问题。接下来就是解读该线程的 run 方法,从而探究 binlog 的解析流程。
Step31:首先创建一条到需要解析 binlog 日志的服务器,例如需要同步 192.168.1.166:3306 这个数据库实例的 binlog 日志,那 Canal 首先会使用拥有该库复制权限的账号去创建一条TCP连接,本文并不会详细去介绍这里的实现细节,这里代表一个领域,即需要知晓 MySQL 通讯协议,通过TCP与MySQL建立连接,并按照 MySQL 通讯协议发送命令,例如 select、dump 等请求,这个后续在学完 Canal 等核心组件后,可能会深入学习该部分的内容,这里我重点点出其实现的几个关键要点:
- 首先创建一条TCP连接,连接到 MySQL 服务器,Canal 提供了 BIO 与 Netty 两种实现方式。
- TCP 三次握手后成功建立TCP连接后,需要与 MySQL 进行握手,完成协议约定,客户端登录校验等,例如握手实现代码见:MysqlConnector negotiate。
- 一言以蔽之,MySqlConnection 的职责就是实现一个 MySQL 客户端。其效果等同于实现我们常用的 SQL 连接客户端,关于这方面的编程其实不难,如果大家有志成为一名数据库中间件方面的技术人员,只需按照 MySQL 官方文档中有关通讯协议即可。
Step32:发送心跳包,这里的关键实现点如下:
- 利用 Timer 实现定时调度,心跳包发送间隔通过 detectingIntervalInSeconds 指定。
- 心跳包主要是构建一个 CanalEntry.Entry,其类型为EntryType.HEARTBEAT。
心跳包并不是发送给远端 MySQL 服务器,而是将 Entry 下发到 EventSink 组件。 - 该心跳包的用意合作,在这里先留一个伏笔,后续文章会一一揭晓。
Step33:执行发送 dump 命令正式从 MySQL 服务器接收 binlog 日志之前的准备工作,具体准备工作如下:
- 首先再创建一条专属数据库连接,主要用于查找 MySQL 的一些配置信息,统称元数据。
- 向 MySQL 服务器发送 show variables like 'binlog_format‘ 语句查询服务端配置的 binlog 格式,MySQL 支持 STATEMENT、ROW、MIXED 三种模式。
- 向 MySQL 服务端发送 show variables like 'binlog_row_image' 语句查询服务器端配置的 binlog_row_image。
扩展阅读:binlog_format 我相信大家都不陌生,对 binlog_row_image 见过的估计比较少,那 binlog_row_image 有何作用呢?
binlog_row_image 主要是在 binlog_format 为 ROW 模式下,控制记录 binlog 事件的方式,binlog 的作用是记录数据的变化,例如 update 请求,需要记录一行记录变化之前的数据以及变化后的数据,在 binlog event 分别用 before 、after 记录变化前后的数据,但有一个问题,是只发生变化的字段的前后值呢,还是记录一行中所有字段修改前后的值呢?故引入了 binlog_row_image,该值支持如下选项:
- full:在 before 与 after 中记录所有字段的值,针对每一个字段,使用 update 来表示该字段是否发生变化,该选项为默认值。
- minimal:在 before 与 after 中只记录发生变化的字段,并且包含能够唯一识一行数据的值,例如主键。
- noblob:在 before 与 after 中记录所有的列值,但 BLOB 与 TEXT 类型的字段列除外(如未更改)。
Step34:向 MySQL 服务端发送 show variables like 'server_id' 语句,查询服务端配置的 serverId。
Step35:通过日志位点管理器获取需要同步的位点,后续会详细展开。
Step36:通过向 MySQL 发送 dump 请求,从服务器接收 binlog 日志,并进行处理,为了提高性能,Canal 支持该过程进行并行化处理,通过 parallel 属性设置是否支持并发,从而引入 disruptor 高性能并发框架,详情后在后续文章中详细解读。
Step37:通过接收到 MySQL 服务端返回的日志并解析为 Canal.Entry 对象,并传输到 EventSink 组件。
上述过程反复执行,持续完成 binlog 日志的解析,实现数据的同步。
4、总结
本文首先结合官方文档了解了 EventParser,但 Canal 的官方手册并不特别详细,故需要我们通过源码去反推 canal instance 中关于 EventParser 有哪些参数,并且这些参数有何意义,是如何工作的。
众所周知,EventParser 的主要职责就是与 MySQL 服务器“打交道”,将自己伪装成 MySQL 服务器的一个从节点,从服务器端接收 binlog 日志,并将二进制流解码成 Canal.Entry,看似简单,但实现起来还是比较困难的,下面这些方面是后续值得我们研究探讨的点:
- 环形缓存区的使用与技巧。
- 实现 MySQL 通讯协议,向 MySQL 发送相关SQL语句并解析返回结果\,具体由 MysqlConnection 对象实现。
- 日志解析位点管理机制。
- 基于GTID、日志位点偏移量两种方式定位 binlog 日志方式。
- dump 命令的发送、高性能设计( disruptor 框架的引入)