4. CanalServerEmbedded类
4.1 基本认识
- 非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
- 维护了instance的对象容器
- 继承了CanalServer和CanalService接口
CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,
当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。
我们重点看下CanalService接口定义的方法。
每个方法的入参都带来clientIdentity,这个是客户端的身份标示
目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。
了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。
尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。
4.2 subscribe方法
主要步骤:
- 根据客户端标识clientIdentity中的destination,找到对应的instance。
- 通过instance的metaManager记录下当前这个客户端在订阅。
- 通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager。
- 通知下订阅关系变化。
这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。
- MemoryMetaManager:位点信息保存在内存中。
- ZookeeperMetaManage:位点信息保存在zk上。
- PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上。
我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。
4.3 unsubscribe方法
这个方法比较简单,就不放源码了。
就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。
需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。
4.4 getWithoutAck方法
先解释几个概念。
我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。
其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录
回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:
- 根据clientIdentity的destination获取对应的instance
- 获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
- 从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
- event转化为entry,并生成新的batchId,组合成message返回给客户端
注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。
4.5 get方法
这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。
主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。
在项目中看起来暂时没有使用,我们就不展开了。
4.6 ack方法
进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
- 从metaManager中移除batchId对应的记录
- 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
- 已经ack的数据,在eventStore中清除
4.7 rollback
rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。
回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。
5.canalMQStarter类
在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。
所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。
canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。
canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。
如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。
主要组成如下:
- 工作线程池executorService,对每个instance起一个worker线程。
- canalMQWorks,记录了destination(instance的标识)和worker线程的关系。
- 维持了一个CanalServerWithEmbedded对象。
- CanalMQProducer投递mq消息。
5.1 start方法
这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。
具体做了三件事情:
- 获取CanalServerWithEmbedded的单例对象
- 对应每个instance启动一个worker线程CanalMQRunnable
- 注册ShutdownHook,退出时关闭线程池和mqProducer
这里主要看看CanalMQRunnable做了些什么。
5.2 CanalMQRunnable
这是一个内部类,就是看看worker里面做了什么
只有一个worker方法,主要逻辑非常清晰:
- 给自己创建一个身份标识,作为client。
- 根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)。
- 构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参。
- 在embeddedCanal中注册这个订阅客户端。
- 开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费。
6.总结
回到开头的几个问题,相信文中都已经做了解答。
- CanalServer有几种使用方式?
可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。
可以内嵌部署开发(embedded,难度较高)。
- 控制台Admin、客户端client是如何与CanalServer交互的?
控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。
- CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。
- Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?
CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议。