「从零单排canal 05」 server模块源码解析(二)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 「从零单排canal 05」 server模块源码解析(二)

4. CanalServerEmbedded类


4.1 基本认识


  • 非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
  • 维护了instance的对象容器
  • 继承了CanalServer和CanalService接口


118.jpg


CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,


当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。

119.jpg


我们重点看下CanalService接口定义的方法。

120.jpg


每个方法的入参都带来clientIdentity,这个是客户端的身份标示

121.jpg


目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。


了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。


尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。

122.jpg


4.2 subscribe方法


主要步骤:

  • 根据客户端标识clientIdentity中的destination,找到对应的instance。
  • 通过instance的metaManager记录下当前这个客户端在订阅。
  • 通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager。
  • 通知下订阅关系变化。

123.jpg


这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。


  • MemoryMetaManager:位点信息保存在内存中。
  • ZookeeperMetaManage:位点信息保存在zk上。
  • PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上。


124.jpg


我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。


4.3 unsubscribe方法


这个方法比较简单,就不放源码了。


就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。


需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。


4.4 getWithoutAck方法


先解释几个概念。


我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。


其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录

125.jpg


回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:


  • 根据clientIdentity的destination获取对应的instance
  • 获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
  • 从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
  • event转化为entry,并生成新的batchId,组合成message返回给客户端


注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。


126.jpg


4.5 get方法


这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。


主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。


在项目中看起来暂时没有使用,我们就不展开了。


4.6 ack方法


进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。


  • 从metaManager中移除batchId对应的记录
  • 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
  • 已经ack的数据,在eventStore中清除

127.jpg


4.7 rollback


rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。


回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。


5.canalMQStarter类


在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。


所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。


canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。


canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。


如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。


主要组成如下:


  • 工作线程池executorService,对每个instance起一个worker线程。
  • canalMQWorks,记录了destination(instance的标识)和worker线程的关系。
  • 维持了一个CanalServerWithEmbedded对象。
  • CanalMQProducer投递mq消息。

128.jpg

5.1 start方法


这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。


具体做了三件事情:


  • 获取CanalServerWithEmbedded的单例对象
  • 对应每个instance启动一个worker线程CanalMQRunnable
  • 注册ShutdownHook,退出时关闭线程池和mqProducer

129.jpg


这里主要看看CanalMQRunnable做了些什么。


5.2 CanalMQRunnable


这是一个内部类,就是看看worker里面做了什么

130.jpg


只有一个worker方法,主要逻辑非常清晰:


  • 给自己创建一个身份标识,作为client。
  • 根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)。
  • 构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参。
  • 在embeddedCanal中注册这个订阅客户端。
  • 开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费。

140.jpg


141.jpg


6.总结


回到开头的几个问题,相信文中都已经做了解答。


  • CanalServer有几种使用方式?


可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。


可以内嵌部署开发(embedded,难度较高)。


  • 控制台Admin、客户端client是如何与CanalServer交互的?


控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。


  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?


CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。


  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?


CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议。


目录
相关文章
|
2天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
3天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
81 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
65 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
69 0
|
2月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
92 0
|
26天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12

热门文章

最新文章

推荐镜像

更多