Canal 如何保证数据库库事务的一致性

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Canal 如何保证数据库库事务的一致性

文将主要介绍在 EventParser binlog 日志同步流程中关于环形缓存区的使用技巧。


1、环形缓存区


关系型数据库讲究的是ACID 4个特性,故引入了数据库事务的概念,一个数据库事务中的多条SQL引发的多条数据变更要么全部成功,要么全部失败,即数据的一致性,那同样在数据同步的场景,在解析一个事务的 binlog 日志时,一次数据同步应该至少以事务为单位,一个事务内的所有 Event 应该作为一个批次提交到数据消费端,让消费端有能力一次同步一个事务中的数据,而不是一条一条变更日志的处理,这样容易造成数据不一致。


环形缓存区的引用就是为了解决将一个事务的完整数据一次提交到消费端,既然是多条消息,故一定需要用到缓存,环形缓存区就在这样的背景下被引入。


在 Canal 中关于事务 Event 的环形缓存区实现类为 EventTransactionBuffer。


1.1 类图


EventTransactionBuffer 的类图如下:

990c104a7ae19dd2ee6960490110823f.png

根据类图我们可以到其存储结构还是比较简单的。


  • int bufferSize环形缓存区的长度,默认为 1024,该长度必须为 2 的幂次方,因为对位运算非常友好。
  • int indexMask环形缓存区下标掩码,其值为 bufferSize - 1 ,sequence * indexMask 能快速定位序号 sequence 所在环形缓存区中的具体下标。
  • CannalEntry.Entry[] entries环形缓存区数据数组,即缓存区实际存储数据的内存区域,为数组结构,长度为 bufferSize。
  • AtomicLong putSequence当前写入的序号,每调用 add 方法添加一条数据,该值增加一,可超过缓存区的实际长度。
  • AtomicLong flushSequence当前已处理的数据序号,flushSequence <= putSequence,(putSequence - flushSequence)表示未处理的数据,即缓存区累积的有效数据。
  • TransactionFlushCallback flushCallbackflush 回调函数,这个和环形缓存区本身关系不大,这个与 Canal 特定业务的,环形缓存区中收集到一个完整的事务变更日志列表后,将这部分内容传入业务回调方法,并重新利用这些缓存空间。


环形缓存区的重大要义就是循环利用。


1.2 环形缓存区存储实现


接下来我们通过其 add 方法来看一下环形缓存区的,在研究环形缓存区之前,将结合8个元素的环形缓存区进行讲解。

60ff2b0a3f8a4a4397a82fe34998b3f9.png

EventTransactionBuffer 的 add 方法代码如下:

cb35edbb6d138aba4912d0eae719aa42.png

首先根据 binlog 事件类型来决定是否调用 flush 方法,这个就是实现将一个事务的事务一起提交到消费端,回到环形缓存区的具体实现,我们重点关注 put 方法 与 flush 方法的实现。EventTransactionBuffer#put

a6935dcfa7624bcded4d000c9b70a4c4.png

其实现的核心步骤:


  1. 检测当前环形缓存区是否已满,如果未满,则向缓存区中添加一条数据。添加数据的具体逻辑:
  • 获取下一个写入的序号 next,等于当前已写入的序号 + 1,即 putSequence + 1。
  • 通过 next & indexMask 取得放入 CannalEntry.Entry[] entries 中的下标,与 next % bufferSize 效果等同。


  1. 如果已满,则首先将缓存区中的数据刷新,即将未处理的数据全部抽取,提交到数据消费方,然后释放缓存区,继续添加数据。


关键在于如何判断环形缓存区已满,具体算法如下:

acdfdd002da9c14315c813077078dba6.png

为了加强对这段代码的理解,我举一个示例,在一个8个元素的环形缓存区中,假设一个事务包含5条日志,首先依次写入5条日志,其环形缓存区如下:

ee9569d3d2a9a2a3994a26e92d38d70b.png

此时 putSequence 为 4,flushSequence 为 -1 ,我们应该能发现,在第一轮时,由于 sequeue 小于 bufferSize ,如果不执行 flush 操作,连续写入 8条数据,sequence = 7 时,sequence - bufferSize > flushSequence 这个表达式都不会满足,即代表缓存区未满,但在写入第9条消息时,sequence = 8 ,此时 sequence - bufferSize > flushSequence 已满足,即缓存区已满,需要先刷新数据,然后才能再填充。


再回到本示例中,一个事务只包含5条日志,在写满 5条日志后会即调用 flush 方法,将环形缓存区中下标为 0~4 的消息传入数据消费方,在 Canal 中会将这批消息一次传入 EventSink 组件。执行完 flush 方法后,flushSequence 等于4,其环形缓存区如下图所示:

98f3d7a3898d4204865910a90db379a9.png

此时 putSequence = flushSequence = 4,那这个时候环形缓存区的容量是多少呢?其实就是又恢复到 bufferSize 了,那我们怎么计算环形缓存区当前已写入的消息呢?其实很简单,putSequence - flushSequence 表示已写入的元素数量。那当前剩余容量就等于 bufferSize - (putSequence - flushSequence ),即只需要 bufferSize - (putSequence - flushSequence ) > 0 就表示有剩余空闲。有了这一层思路,就能明白 checkFreeSlotAt 的算法,这也是环形缓存区的核心所在。


思考,Canal 基于环形缓存区的实现,一定能保证一个事务的所有变更日志都一次提交到 EventSink 组件吗,大家可以简单思考一下,在文末的总结部分有笔者的思考。


答案是否定的,如果一个事务包含的日志条目超过了环形缓存区的长度,为了保证数据不丢失,会首先将环形缓存区的数据全部提交,然后接收新的数据,这样一个事务中的消息会被分成多次提交到 EventSink。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
3月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
12天前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
2月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
3月前
|
消息中间件 缓存 监控
如何保证缓存和数据库的一致性?
保证缓存和数据库的一致性的做法
|
5天前
|
数据库
什么是数据库的事务隔离级别,有什么作用
【10月更文挑战第21】什么是数据库的事务隔离级别,有什么作用
10 3
|
5天前
|
存储 关系型数据库 数据挖掘
什么是数据库的事务隔离级别
【10月更文挑战第21】什么是数据库的事务隔离级别
9 1
|
10天前
|
存储 数据库 数据库管理
数据库事务安全性控制如何实现呢
【10月更文挑战第15天】数据库事务安全性控制如何实现呢
|
10天前
|
存储 数据库 数据库管理
什么是数据库事务安全性控制
【10月更文挑战第15天】什么是数据库事务安全性控制
|
10天前
|
供应链 数据库
数据库事务安全性控制有什么应用场景吗
【10月更文挑战第15天】数据库事务安全性控制有什么应用场景吗
|
10天前
|
存储 关系型数据库 MySQL
数据库的事务控制
【10月更文挑战第15天】数据库的事务控制
15 2