概述
要理解canal数据消费过程,必须要知道canal的server端是如何响应client端的请求的,还记得我们之前说过的么,canal的server其实分为对内(CanalServerWithEmbedded)和对外的(CanalServerWithNetty),由CanalServerWithNetty负责响应client端请求,而后由CanalServerWithEmbedded负责进行处理。
server支持的动作包括 subscription(订阅)、unsubscription(取消订阅)、get(获取)、clientAck(确认)、clientRollback(回滚)。
CanalServerWithNetty其实底层依赖的是netty,刚好清明的时候把netty大概看了一遍,所以说很多东西都是凑巧,看了不知道以后什么时候会用上。
ok,最后想说的就是,这篇文章其实就想说清楚一个过程,就是client的get/ack/rollback三个过程,讲清楚这三个我觉得就够了。
canal消费流程

说明
从上图可以看出canal的消费其实由client向nettyServer发起请求,然后转由EmbeddedServer进行处理。
CanalServerWithNetty介绍
nettyServer介绍

说明:
从构造函数可以看出来,在CanalServerWithNetty内部其实放入了一个CanalServerWithEmbedded对象,记得这句话,CanalServerWithNetty内部通过CanalServerWithEmbedded去完整真正的任务处理。
注意bootstrap最后绑定的sessionHandler函数,这是处理函数的核心。
SessionHandler介绍

说明
SessionHandler支持上图中支持的操作,咱们就关注下GET、CLIENTACK、CLIENTROLLBACK这几个动作。
GET介绍
get过程中我们要记得数据是从eventStore里面获取的,eventStore的设计其实是一个环状设计,可以参考《canal 组件介绍》,在这个基础上就可以理解了get的过程。
get的过程其实是一个过程,主要是从store获取数据,然后记录消费位移(也就是get位置)到zk节点,这个记录其实使用了zk的持久化递增节点,这样子能够确保我们每次ack的时候按照顺序进行get。
记录到zk的position位置包括三个关键位移,包括start、ack、end,至于这几个有什么用,暂时我也不清楚,应该就是你获取的这批数据的开始位移(start)、结束位移(end)、确认位移(ack)。
记录的位移的zk节点目录是 /otter/canal/destinations/xxxx/clientId/mark/00000000-00000011,其中xxxx是instance的名字代表同步的数据,clientId代表启动的client端。





CLIENTACK介绍
ack过程其实就是就是client端发送ack消息体到server,server根据ack消息体里面的batchId和clientId去找到get数据时候获取上次记录的位移,通过删除记录节点并记录到新的zk节点完成ack确认,其实我们理解为就是我们删除了待ack的数据代表就完成了ack。
记录的位移的zk节点目录是 /otter/canal/destinations/xxxx/clientId/mark/00000000-00000011,其中xxxx是instance的名字代表同步的数据,clientId代表启动的client端,这个目录我们可以理解消费过程的记录。
最后我们会用单个节点记录消费位移,位置/otter/canal/destinations/xxxx/clientId/cursor,这个我们可以理解为最终结果的位移。


CLIENTROLLBACK介绍
clientRollback的过程其实就是移动位移的过程,也就是在环形数据的get位移恢复到rollback的位置,再次强调一下重点,说三遍。
rollback其实就是一个移动下标的过程而已。
rollback其实就是一个移动下标的过程而已。
rollback其实就是一个移动下标的过程而已。


