本文将对canal的server模块进行分析,跟之前一样,我们带着几个问题来看源码:
- CanalServer有几种使用方式?
- 控制台Admin、客户端client是如何与CanalServer交互的?
- CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
- Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?
server模块内的结构如下:
主要分为了三个包:
- admin包:
这个包的CanalAdmin接口定义了canalServer上暴露给canal-admin控制台使用的一些服务接口。
上一篇deployer模块解析中提到的CanalAdminController就是实现了CanalAdmin接口(把这个接口的实现放在deployer模块是挺奇怪的)。Admin包中使用了netty作为服务端(CanalAdminWithNetty类中实现),接受控制台Admin的请求,返回当前canalServer的一些运行状态。
- server包:
server模块的核心包,本文重点解析的部分,需要了解CanalServerWithEmbedded 和CanalServerWithNetty。
- spi包:
定义了canalServer的监控内容 通过spi实现,比如项目中的Prometheus子模块实现了监控能力,我们不展开分析。
1. 从CanalServer的架构说起
CanalServer目前支持两种模式:
- serverMode = tcp的Server-Client模式
- serverMode = kafak 或 rocketMQ 的 Server-MQ-Client模式
为了大家能充分理解canalServer的结构,这里精心制作了一个canalServer的架构图(如果觉得这图不错,给本文点个赞吧)。
1.1 Server-Client模式
架构如图所示:
我们可以清楚的看到Server模块中各个模块的关系与能力:
- CanalServerWithEmbedde维护了具体的instance任务,负责对binlog进行订阅、过滤、缓存,就是之前的文章介绍过的parser-sink-store的方式。
- CanalServerWithNetty作为服务端,接收CanalClient的请求,将binlog的消息发送给client。
- CanalAdminWithNetty作为admin的服务器,接收控制台Admin的控制操作、查询状态操作等,启停或显示当前CanalServer以及instance的状态。
1.2 Server-MQ-Client模式
架构如图所示:
主体部分与Server-client模式一致,主要区别如下:
- 不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列。
- 不使用CanalClient,改为MqClient获取消息队列的消息进行消费。
这种模式相比于Server-client模式
- 下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等。
- 会增加系统的复杂度,增加一些延迟。
具体模式的选择,需要根据具体的使用场景来决定。
2.server包
admin包和spi包都不属于核心逻辑,因此我们重点关注server包的代码。
我们看到,server包下面分为了embedded包、exception包、netty包和几个接口类。
其中,最顶层的设计就要从CanalServer接口入手。
它的实现类有两个,CanalServerWithEmbedded 和 CanalServerWithNetty。
它们之间的区别官方文档给了一些说明。
那么,对于官方文档中提到的Embedded(嵌入式)的自主开发是怎么使用呢?
跟我们上面提到的Server-Client模式和Server-MQ-Client模式完全不同,采用了一种无server的架构,如下图所示。
我们可以看到,这种模式没有了Canal-Server,直接在自己的应用中引入canal,然后使用CanalServerWithEmbedded进行数据抓取和订阅。
当然,这种方式开发成本有点高,一般也不会去这样使用。
对于CanalServerWithEmbedded 和 CanalServerWithNetty,官方文档里面实际上没有解释的特别到位,只讲了区别,没有讲联系。
这两个实现类除了官方文档中说明的区别之外,还有很大的联系。
可以看看我们上文介绍的架构图,对于Server-Client模式下的模块联系
实际上,真正的执行逻辑是在CanalServerWithEmbedded中的,CanalServerWithNetty中持有了CanalServerWithEmbedded对象,委托embedded进行相关逻辑处理,CanalServerWithNetty更多的作用是充当服务端与CanalClient进行交互。
3. CanalServerWithNetty类
下面,我们先看看CanalServerWithNetty类。
3.1 单例构建
使用 private构造器 + 静态内部类 来实现一个单例模式,保证了一个CanalServer内部只有一个CanalServerWithNetty。
同时,我们能看到内部持有一个CanalServerWithEmbedded对象,用来处理相关请求,验证了我们上面的说明。
3.2 启动逻辑 start()
源码如下:
主要流程如下:
- 启动embeddedServer。
- 创建bootstrap实例,设置netty相关配置。
参数NioServerSocketChannelFactory也是Netty的API,接受2个线程池参数,第一个线程池是Accept线程池,第二个线程池是woker线程池,Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。这里属于netty的知识,不熟悉的可以暂时不必深究,简单认为netty使用线程来处理客户端的高并发请求即可。
- 构造对应的pipeline,包括解码处理、身份验证、创建netty的 seesionHandler(真正处理客户端请求,seesionHandler的实现是核心逻辑)。
pipeline实际上就是netty对客户端请求的处理器链,可以类比JAVA EE编程中Filter的责任链模式,上一个filter处理完成之后交给下一个filter处理,只不过在netty中,不再是filter,而是ChannelHandler。
- 启动netty,监听port端口,然后客户端对 这个端口的请求可以被接收到
对于 netty的相关知识 ,本文 不深入展开,简单理解 为一个高性能服务器即可,可以监听 端口请求,并 进行相应的处理。
重点在于sessionHandler的处理。
3.3 逻辑分发SessionHandler类
canalServer的处理逻辑显然都在sessionHandler里面,而这个handler在构建时,传入了embeddedServer。
前面我们提过,serverWithNetty的处理逻辑是委派给embeddedServer的,所以这里就非常顺理成章了,让handler维护embeddedServer实例,进行逻辑处理。
sessionHandler继承了netty的SimpleChannelHandler类,重写了messageReceived方法,接收到不同请求后,委托embeddedServer用不同方法进行处理 。
这个方法里面的代码非常冗长,而本质都是委托给embeddedServer去处理,因此,我们看下主干逻辑即可。
可以看到,根据不同的packet类型,最终都是委托给embeddedServer进行处理,这里只是做一个逻辑的判断和分发。
3.4 CanalServerWithNetty小结
到此,我们已经了解了CanalServerWithNetty是如何启动的。
并且,它的主要定位就是充当服务器,接收客户端的请求,然后做消息分发,委托给CanalServerEmbedded进行处理。
下面,我们来看下CanalServerEmbedded的相关实现。