前言
文本已收录至我的GitHub仓库,欢迎Star:github.com/bin39232820…
种一棵树最好的时间是十年前,其次是现在
絮叨
canal的入门篇,我已经带大家搭建好了canal 并用Java客服端 去订阅canal 从mysql那边拿到的binlog日志
其实我们生产中的作用也是差不多这么玩的,只是说完善一点、,这篇带大家说说canal的一些原理。
因为这篇是进阶,所以建议大家呢一定要按我前面的把第一个demo做出来,再看第二篇
canal的工作原理:
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
canal的架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
EventParser设计
- 第一步 Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
- 第二步 nnection建立链接,发送BINLOG_DUMP指令
- 第三步 Mysql开始推送Binaly Log
- 第四步 接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息。
- 第五步 传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
- 第六步 存储成功后,定时记录Binaly Log位置
EventSink设计
- 数据过滤:支持通配符的过滤模式,表名,字段内容等
- 数据路由/分发:解决1:n (1个parser对应多个store的模式)
- 数据归并:解决n:1 (多个parser对应1个store)
- 数据加工:在进入store之前进行额外的处理,比如join
EventStore设计
- 目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
- 借鉴了Disruptor的RingBuffer的实现思路
定义了3个cursor
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
增量订阅/消费设计(还是要第一节的基础,不然很难看懂)
具体的协议格式,可参见:CanalProtocol.proto
get/ack/rollback协议介绍:
Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
- a. batch id 唯一标识
- b. entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto
- void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作
- void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作
canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
数据对象格式:EntryProtocol.proto
Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [发生的变更] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组] afterColumns [Column类型的数组] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为文本] 复制代码
可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全 可以提供ddl的变更语句
其实canal还可以直接用mq去订阅 这样就不用再写一个Java客户端了
具体参考:配置Canal投递消息到RocketMQ
喜欢的一句话
昨天看书听到了一句非常喜欢发话送给大家:这个是岳麓书院的一幅对联,大家有机会可以去看看
是非审之于己,毁誉听之于人,得失安之于数,成败归之于零,
是是非非由自己的内心来判断,诋毁还是赞誉随别人去说,得到的与失去的都只是天定的。
结尾
canal系列完结了,其实讲的不是很深,但是基本上能自己用了,如果要深入还得靠大家自己,因为我自己也还只是了解层面,感谢大家的支持,下期打算做Java基础吧 ,感觉Java基础,要讲的东西好多呀。正好大家明年面试 哈哈
因为博主也是一个开发萌新 我也是一边学一边写 我有个目标就是一周 二到三篇 希望能坚持个一年吧 希望各位大佬多提意见,让我多学习,一起进步。