Canal 初次启动时如何定位同步位点(文末附流程图)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Canal 初次启动时如何定位同步位点(文末附流程图)

本文将详细剖析Canal在初次启动时如何定位同步位点,行为思路先源码,再辅以流程图进行说明,并在总结部分使用思维导图进行总结,试图引发各位的讨论。


1、Canal定位启动位点


在一个 Canal Instance 实例启动时,在向 MySQL 发送 dump 命令之前,首先先得计算该从 binlog 的什么位置开始同步,初次启动时如何寻找位点等。其代码如下图所示:

9111ecb11ae50c6f942867183aed987a.png

从这里可以看成,将调用 findStartPosition 方法查找启动时需要从那个位置开始同步 binglog ,该方法是一个抽象方法,具体实现在其子类中,我们将重点关注一下其子类 MysqlEventParser。

72d65a17142c3c999d72a216b214e099.png

在MySQL中定位binlog日志可以分为gtid、binlog文件名+position两种方式,故Canal查找position的方式也分两种情况进行展开,由于篇幅问题,本节将暂不考虑gtid。


这里主要是调用 findStartPositionInternal 方法进行查找位点,这里还有一个标记 needTransactionPosition,表示查出来的位点是不是一个事务的开始或结束。

接下来重点探讨 Canal在启动时如何定位解析位点的。


1.1 查找位点


513164fda7c7006c5ff18b06c98a301c.png

Step1:使用位点存储管理器中查看已解析过的位点数据,Canal 提供了多种日志管理实现,这部分稍后会详细展开。

7b472518f1427a33547f6e09c1118701.png

Step2:这里分如下两种情况


  • 如果日志位点管理器(LogPositionManager)中并未存储相关的位点信息,例如初次启动时的处理逻辑。
  • 如果日志位点管理器中已存储相关的位点信息的处理逻辑。


由于初次启动时日志位点管理器并没有存储其位点信息,故我们先看位点管理器并未存储位点的情况。


c66c9822fe8f43770cf1da95e5bfda08.png

Step3:如果当前连接的是主节点,则尝试使用 masterPosition,如果当前连接的是从节点(发生了切换),即使用 standbyPosition,那这两个位点信息是从哪来的的呢?原来在 Canal Instance 实例启动之前,可以手动通过 positions 属性手动设置开始解析位点。

c9e87d182c76c98341b3587965b88400.png

Step4:如果在启动时未手动设置初始解析位点,则从当前 binlog 日志最后的位点开始同步,其实现原理是向 MySQL 服务器发送 show master status\G 命令,其命令输出结果如下图所示:

d104d280049f183e75929bb127af60ff.png

接下来再关注一下如果从日志位点管理器中查找到位点的处理逻辑,在进入该流程的探究之前,先看一下表示位点的实体类,一睹其结构。

d90dce9e5f7d074a861f1b7e2bfec920.png

会在 LogIdentity 中记录该日志位点是由哪个 slaveId 以及所连接的 MySQL 服务器信息。

31a68a0a5c51ea142cd79888b47500aa.png

Step5:如果从日志位点管理器中查询到位点,则需要判断当前连接的服务器地址与日志位点中记录的是否一致,如果不一致则说明发生了故障切换,为了确保数据不丢失,提供了回退时间的机制,其具体实现关键点如下:


  • 如果解析 dump 出现的次数超过其阔值,可能是基于VIP模式发生了漂移,此时可以根据 serverId 来判断是否发生了切换,如何切换了,则按时间回退来重新寻找位点。
  • 如果查找到的位点连接的信息与当前连接的信息不符合,说明发生了切换,则需要回退指定的时间,即根据时间区重新定位位点,至于回退多久的时间,可以通过参数 fallbackIntervalInSeconds 进行设置,默认为 60s。


Canal Instance 启动时如何定位同步位点的流程就介绍到这里了,接下来我们再来看一下 Canal 如何基于时间戳来定位 binlog 位点。


为了流程的完整性,在学习如何根据时间戳查找binlog位点之前,我们先来看一下从位点管理器中查询到对应的位点信息后的处理流程。

7856c050ff1dd54df76303834fe0ce69.png

如果从位点管理器中查询到位点信息,首先判断当前连接的MySQL服务器(主或从)与位点信息是否一致,如果不一致,说明发生了主从切换,为了保证数据的完整性,需要对位点进行前移,默认为回退到60s之前的位点,


1.2 基于时间戳从查找 binlog 位点


基于时间戳查找 binlog 位点的实现方法为 MysqlEventParser 的 findByStartTimeStamp,接下来我们来看一下其实现原理。

7736fbad5360040df9a902d3b8ffae31.png

Step1:首先先查询最大的位点与最小位点,最小位点可发送SQL:show binlog events limit 1。

afbbf5e8849e6b560b5425a2fd5aa44a.png

Step2:然后从最后一个文件开始,尝试根据开始时间戳进行日志查找,等下会详细介绍如果从一个binlog日志定位 endposition。

86dcaf6cbaacdf7c9cd2cf90aafe0dad.png

Step3:如果找到一个合适的endposition,则结束寻找。如果没有找到一个合适的endposition,则尝试向前一个文件进行解析,首先解析出要查找的最小文件的名称,例如(mysql-bin.000036),从文件名称序号,然后减1,再判断该文件名是否小于这次可查找的最小文件名,如果不大于,则向前继续选择,否则结束查找,返回null。


接下来我们看一下如果在一个binlog文件中根据时间戳查找合适的位点。

48edce6fe693b6952266eaeb0fa46066.png


通过向 MySQL Master 发送 dump 命令,建立连接,一条一条从 binlog 日志中解析事件,一条日志日志进行匹配,每从master获取一个logevent,调用 SinkFunction 的 seek 方法。

30cb50392c06dab016b4375b7964471e.png


Step1:如果 justForPositionTimestamp 参数为 true,表示在查询位点时只考虑时间戳,并不考虑事务,在按开始时间戳寻找的方法中该参数为 false,即不会进入该方法。

5100ac72bcf3358f433c93471044a6ea.png

Step2:获取当前日志的基本信息,例如所在的binlog日志文件、日志偏移量、日志写入时间戳、master serverId。

23a1893a8a23b866c678c94cb76cf0a1.png

Step3:如果记录日志的时间戳大于等于待查找的时间戳,返回 false,停止在文件中的停止,是否继续查找其他文件取决在在当前文件中是否已查到符合条件的日志(LogEvent),即是否查找到小于或等于要查找的时间戳。


温馨提示:按照时间戳去查找,其设计理念就是查找小于待查找时间戳中的最大时间戳的LogEvent。

66cc3f07666e345ae123bec041cea962.png

Step4:如果当前的解析的日志偏移量小于此次待查找的最大偏移量,同样结束本文件的查找(针对查找的第一个文件)。因为在查询的时候,首先会查询当前最大偏移量,即查找时的快照,新的内容不在本次查找范围内。

3b8862b9119a2891c21927678e493461.png

Step5:重点查找事件类型为TRANSACTIONEND与TRANSACTIONBEGIN ,即事务结束与事务开始的事件,并将其存储在 logPostion 中,表示该文件中满足查找条件的事件,但并不是只要找到一条就退出,而是继续向后找,直到找到最合适的事件。


由于源码剖析不够直观,为了更好的理解按照时间戳查找日志位点,再给出其流程图,如下:

9b19959d31e8fa662afff0b928d297c6.png


2、总结


阅读源码不那么直观,故先来一张流程图对其进行一个总结,辅助大家了解定位位点的核心步骤。

f3d1a095250055593a61534c7eca1145.png



相关文章
|
C#
如何解决在PotPlayer中看视频音画不同步的问题(C#视频可用)
如何解决在PotPlayer中看视频音画不同步的问题(C#视频可用)
1053 0
|
canal 消息中间件 存储
手把手告诉你如何监听 MySQL binlog 实现数据变化后的实时通知!
Hello 大家好,我是阿粉。不知道大家在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。
5579 0
手把手告诉你如何监听 MySQL binlog 实现数据变化后的实时通知!
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何在程序因故停掉后能从之前的Binlog位置继续读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 缓存 算法
实时计算 Flink版产品使用合集之可以把初始同步完了用增量模式,但初始数据还是要同步,除非初始的数据同步换成用其他工具先同步过去吧,是这个意思吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL Java 数据处理
实时计算 Flink版产品使用合集之在原先的5个表中增加1个表,并且希望在重新启动时能够保留之前的状态,应该选择怎么启动
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 Java Kafka
启动多个jar包来监听同一个数据库的binlog
【2月更文挑战第27天】启动多个jar包来监听同一个数据库的binlog
84 8
|
SQL 关系型数据库 MySQL
如何判断mysql主从是否同步
如何判断mysql主从是否同步
235 0
|
canal 消息中间件 SQL
Canal源码分析之启动时处理逻辑和主备切换机制
Canal源码分析之启动时处理逻辑和主备切换机制
330 0
|
缓存 监控 测试技术
<5>SpringcloudConfig中configClient端不重启项目,手动刷新同步
上一篇博客搭建ConfigClient端写了configClient端从configServer端获取到数据,但是它会放到缓存中,不重启服务是不会重新去configClient获取的。
|
SQL 存储 关系型数据库
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记
快速学习PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换
PostgreSQL 流复制搭建主从环境,同步和异步的解释,压力测试,主从角色切换|学习笔记