「从零单排canal 05」 server模块源码解析(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 「从零单排canal 05」 server模块源码解析(二)

4. CanalServerEmbedded类


4.1 基本认识


  • 非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
  • 维护了instance的对象容器
  • 继承了CanalServer和CanalService接口


118.jpg


CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,


当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。

119.jpg


我们重点看下CanalService接口定义的方法。

120.jpg


每个方法的入参都带来clientIdentity,这个是客户端的身份标示

121.jpg


目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。


了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。


尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。

122.jpg


4.2 subscribe方法


主要步骤:

  • 根据客户端标识clientIdentity中的destination,找到对应的instance。
  • 通过instance的metaManager记录下当前这个客户端在订阅。
  • 通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager。
  • 通知下订阅关系变化。

123.jpg


这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。


  • MemoryMetaManager:位点信息保存在内存中。
  • ZookeeperMetaManage:位点信息保存在zk上。
  • PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上。


124.jpg


我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。


4.3 unsubscribe方法


这个方法比较简单,就不放源码了。


就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。


需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。


4.4 getWithoutAck方法


先解释几个概念。


我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。


其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录

125.jpg


回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:


  • 根据clientIdentity的destination获取对应的instance
  • 获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
  • 从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
  • event转化为entry,并生成新的batchId,组合成message返回给客户端


注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。


126.jpg


4.5 get方法


这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。


主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。


在项目中看起来暂时没有使用,我们就不展开了。


4.6 ack方法


进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。


  • 从metaManager中移除batchId对应的记录
  • 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
  • 已经ack的数据,在eventStore中清除

127.jpg


4.7 rollback


rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。


回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。


5.canalMQStarter类


在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。


所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。


canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。


canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。


如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。


主要组成如下:


  • 工作线程池executorService,对每个instance起一个worker线程。
  • canalMQWorks,记录了destination(instance的标识)和worker线程的关系。
  • 维持了一个CanalServerWithEmbedded对象。
  • CanalMQProducer投递mq消息。

128.jpg

5.1 start方法


这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。


具体做了三件事情:


  • 获取CanalServerWithEmbedded的单例对象
  • 对应每个instance启动一个worker线程CanalMQRunnable
  • 注册ShutdownHook,退出时关闭线程池和mqProducer

129.jpg


这里主要看看CanalMQRunnable做了些什么。


5.2 CanalMQRunnable


这是一个内部类,就是看看worker里面做了什么

130.jpg


只有一个worker方法,主要逻辑非常清晰:


  • 给自己创建一个身份标识,作为client。
  • 根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)。
  • 构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参。
  • 在embeddedCanal中注册这个订阅客户端。
  • 开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费。

140.jpg


141.jpg


6.总结


回到开头的几个问题,相信文中都已经做了解答。


  • CanalServer有几种使用方式?


可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。


可以内嵌部署开发(embedded,难度较高)。


  • 控制台Admin、客户端client是如何与CanalServer交互的?


控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。


  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?


CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。


  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?


CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议。


目录
相关文章
|
6天前
yolo-world 源码解析(六)(2)
yolo-world 源码解析(六)
16 0
|
6天前
yolo-world 源码解析(六)(1)
yolo-world 源码解析(六)
8 0
|
6天前
yolo-world 源码解析(五)(4)
yolo-world 源码解析(五)
14 0
|
6天前
yolo-world 源码解析(五)(1)
yolo-world 源码解析(五)
29 0
|
6天前
yolo-world 源码解析(二)(2)
yolo-world 源码解析(二)
19 0
|
20天前
|
XML Java Android开发
Android实现自定义进度条(源码+解析)
Android实现自定义进度条(源码+解析)
50 1
|
24天前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
36 0
|
6天前
Marker 源码解析(二)(3)
Marker 源码解析(二)
10 0
|
6天前
Marker 源码解析(一)(4)
Marker 源码解析(一)
11 0

推荐镜像

更多