「从零单排canal 07」 parser模块源码解析(一)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 「从零单排canal 07」 parser模块源码解析(一)

本文将对canal的binlog订阅模块parser进行分析。


parser模块(绿色部分)在整个系统中的角色如下图所示,用来订阅binlog事件,然后通过sink投递到store。

14.png


parser模块应该来说是整个项目里面比较复杂的模块,代码非常多。


因此,本文根据过程中的主线来进行展开分析,从 启动 开始,进行分析。

如果读者有其他相关内容不明白的,可以给我留言,我会进行解答或者根据情况再单独写相关内容。


模块内的类如下:

15.png


重点需要关注几个核心问题


  • 如何抓取binlog
  • 对binlog消息处理做了怎样的性能优化
  • 如何控制位点信息
  • 如何兼容阿里云RDS的高可用模式下的主备切换问题


1.从启动进入parser主流程


前面的文章我们已经提到了,instance启动的是,会按照顺序启动instance的各个模块

16.png


parser模块就是在这里开始的。


这里需要注意一下,在beforeStartEventParser方法中,启动了parser的两个相关组件CanalLogPositionManager 和 CanalHAController,这里先分别介绍一下。

  • CanalLogPositionManager:管理位点信息
  • CanalHAController:instance连接源数据库的心跳检测,并实现数据库的HA(如果配置了standby的数据库)


1.1 位点信息管理CanalLogPositionManager


我们用的是default-instance.xml的配置,所以实际实现类是FailbackLogPositionManager

17.png


这里构造器有两个入参,一个是primary的MemoryLogPositionManager,一个是second的MetaLogPositionManager。


前者是内存的位点信息,后者我们我们看一下构造器的metaManager是基于zk的位点信息管理器。

18.png


所以FailbackLogPositionManager逻辑也比较简单,获取位点信息时,先尝试从内存memory中找到lastest position,如果不存在才尝试找一下zookeeper里的位点信息。

19.png


1.2 心跳控制器CanalHAController


我们用的是default-instance.xml的配置,所以实际实现类是HeartBeatHAController

20.png


HeartBeatHAController里面没有特别复杂的逻辑,就是实现了心跳检测成功的onSuccess方法和onFail方法。另外维护了一个CanalHASwitchable对象,一旦心跳检测失败超过一定次数,就执行doSwitch()进行主备切换。

21.png


前提是我们要设置了主备数据库的连接信息。


这里的代码写的真的是有点混乱,居然是用MysqlEventParser实现了这个doSwitch()方法。


另外,在MysqlEventParser中,写了一个MysqlDetectingTimeTask内部类,集成了TimerTask来做定时心跳检测。


  • 定时去连接数据库,可以通过select\desc\show\explain等方法做存活检测
  • 如果检测成功,就调用HeartBeatHAController的onSuccess方法
  • 如果失败,就HeartBeatHAController的onFail方法
  • 如果失败超过一定次数,onFail方法中就会调用doSwitch方法进行主备切换


2.核心逻辑


从parser.start()进去后,我们就来到了parser的核心。


从default-instance.xml配置文件看,默认的parser实现是从base-instance.xml的baseEventParser来的,用的是RdsBinlogEventParserProxy类。

22.png


我们看下这个类图的结构。

23.png


从这个结构来看,我们从上到下,就能对parser模块的主体逻辑进行抽丝剥茧了。


Let’s go!


2.1 CanalEventParser接口


定义了一个空的接口


2.2 AbstractEventParser抽象类


这个类里面代码非常多,我们重点关注核心流程。


2.2.1 构造器AbstractEventParser()


构造器里面就只做了一件事情,创建了一个EventTransactionBuffer。

24.png


EventTransactionBuffer这个类顾名思义就是一个缓冲buffer,它的作用源码里的注释也很清楚,它是缓冲event队列,提供按事务刷新数据的机制。


那对于这里构造器中实现的TransactionFlushCallback的flush(List<CanalEntry.Entry> transaction) 方法,肯定就是对于事务中的一系列event,刷新到store中。


我们可以看下consumeTheEventAndProfilingIfNecessary(transaction)方法,跟我们想的一样,具体的sink方法放在后面的sink模块再展开分析。

25.png

2.2.2 主干的start()方法


主要做了这些事情:


  • 初始化缓冲队列transactionBuffer
  • 初始化binlogParser
  • 启动一个新的线程进行核心工作
  • 构造Erosa连接ErosaConnection
  • 利用ErosaConnection启动一个心跳线程
  • 执行dump前的准备工作,查看数据库的binlog_format和binlog_row_image,准备一下DatabaseTableMeta
  • findStartPosition获取最后的位置信息(挺重要的,具体实现在MysqlEventParser)
  • 构建一个sinkHandler,实现具体的sink逻辑(我们可以看到,里面就是把单个event事件写入到transactionBuffer中)
  • 开始dump过程,默认是parallel处理的,需要构建一个MultiStageCoprocessor;如果不是parallel,就直接用sinkHandler处理。内部while不断循环,根据是否parallel,选择MultiStageCoprocessor或者sinkHandler进行投递。
  • 如果有异常抛出,那么根据异常类型做相关处理,然后退出sink消费,释放一下状态,sleep一段时间后重新开始

代码很长,逻辑比较清晰,就不贴了。


2.2.3 核心dump过程


dump过程默认是parallel处理的,需要构建一个MultiStageCoprocessor;如果不是parallel,就直接用sinkHandler处理。内部while不断循环,根据是否parallel,选择MultiStageCoprocessor或者sinkHandler进行投递。


注意multiStageCoprocessor在这里start启动。


代码如下


26.png


dump方法在MysqlConnection类中实现,主要就是把自己注册到数据库作为一个slave,然后获取binlog变更,具体到协议我们就不展开分析了。


通过fetcher抓取到event,然后调用sink投递到store。


注意,parallel为false的,是单线程交给sinkHandler处理,parallel为true的,交给MultiStageCoprocessor的coprocessor.publish(buffer)处理,后面展开分析下并行处理的逻辑。


注意multiStageCoprocessor在这里publish进行写入RingBuffer,下文会详细讲下这里的机制。

27.png


2.3 AbstractMysqlEventParser抽象类


这个类比较简单,就是做了根据配置做了一些对象创建和设置的工作,比如BinlogParser的构建、filter的设置等


2.4 MysqlEventParser实现类


总共有将近1000行代码,里面其实代码组织有点混乱。像前面提到的MysqlDetectingTimeTask内部类、HeartBeatHAController的部分方法实现,都是在这个类里面的。


那抛开这些来说,这个类的主要功能还是在处理根据journalName、position、timestamp等配置查找对应的binlog位点。


我们选取核心流程里面的关键逻辑 findStartPostion( ) 方法进行分析即可。


这个是AbstractEventParser类中start方法中调用的,获取dump起始位点。


我们默认是使用 非GTID mode记录位点信息的,所以直接看下来看下findStartPositionInternal( ) 具体逻辑,这里可以了解到如何正确配置位点信息:


  • logPositionManager找历史记录
  • 如果没有找到
  • 如果instance没有配置canal.instance.master.journal.name
  • 如果instance配置了canal.instance.master.timestamp,就按照时间戳去binlog查找
  • 如果没有配置timestamp,就返回数据库binlog最新的位点
  • 如果instance配置了canal.instance.master.journal.name
  • 如果instance配置了canal.instance.master.position,那就根据journalName和position获取bingo位点信息
  • 如果配置了timestamp,就用journalName + timestamp形式获取位点信息
  • 如果找到了历史记录
  • 如果历史记录的连接信息和当前连接信息一致,那么判断下是否有异常,没有异常就直接返回
  • 如果历史记录的连接信息和当前连接信息不一致,说明可能发生主备切换,就把历史记录的时间戳回退一分钟,重新查询


这里是纯if else 流程代码,挺长的,就不贴了。


在这个过程中,调用了几个有意思的方法,可以了解一下


  • findServerId( ):查询当前db的serverId信息,mysql命令为 show variables like 'server_id'
  • findEndPosition():查询当前的binlog位置,mysql命令为 show master status
  • findStartPosition():查询当前的binlog位置,mysql命令为 show binlog events limit 1
  • findSlavePosition():查询当前的slave视图的binlog位置,mysql命令为 show slave status


2.5 RdsBinlogEventParserProxy实现类


这个类比较简单,就是canal为阿里云rds定制的一个代理实现类。


主要解决了云rds本身高可用架构下,服务端HA切换后导致的binlog位点信息切换。


所以对于抛出的异常做了一定的处理,兼容了这种服务端HA的情况。


同时,也能满足rds的备份文件指定位点开始增量消费的特性。


主要过程如下


  • 如果抛出了PositionNotFoundException异常,就委托rdsLocalBinlogEventParser进行处理
  • rdsLocalBinlogEventParser会通过下载binlog的oss备份,找到目标binlog文件和位置

28.png

目录
相关文章
|
2天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
81 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
65 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
69 0
|
2月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
92 0
|
26天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12

热门文章

最新文章

推荐镜像

更多