3.CanalController
前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。
这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。
3.1 从构造器开始了解
整体初始化的顺序如下:
- 构建PlainCanalConfigClient,用于用户远程配置的获取
- 初始化全局配置,顺便把instance相关的全局配置初始化一下
- 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
- 初始化zkClient
- 初始化ServerRunningMonitors,作为instance 运行节点控制
- 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)
这里有几个机制要详细介绍一下。
3.1.1 CanalServer两种模式
canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。
在构造器中初始化代码部分如下:
// 3.准备canal server //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq // 是不需要这个netty的) ip = getProperty(properties, CanalConstants.CANAL_IP); //省略一部分。。。 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); //省略一部分。。。 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY); if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) { canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); }
embededCanalServer:类型为CanalServerWithEmbedded
canalServer:类型为CanalServerWithNetty
二者有什么区别呢?
都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。
关于这两种类型的实现,canal官方文档有以下描述:
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。
如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。
在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。
因此,在构造器中,我们看到,
用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,
而ip和port被设置到CanalServerWithNetty中。
关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。
3.1.2 ServerRunningMonitor
在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。
ServerRunningMonitor是做什么的呢?
我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。
/** * 针对server的running节点控制 */ public class ServerRunningMonitor extends AbstractCanalLifeCycle { private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class); private ZkClientx zkClient; private String destination; private IZkDataListener dataListener; private BooleanMutex mutex = new BooleanMutex(false); private volatile boolean release = false; // 当前服务节点状态信息 private ServerRunningData serverData; // 当前实际运行的节点状态信息 private volatile ServerRunningData activeData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; private ServerRunningListener listener; public ServerRunningMonitor(ServerRunningData serverData){ this(); this.serverData = serverData; } //。。。。。 }
在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。
ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。
主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。
具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。
new Function<String, ServerRunningMonitor>() { public ServerRunningMonitor apply(final String destination) { ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener() { /** * note * 1.内部调用了embededCanalServer的start(destination)方法。 * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的, * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。 * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。 * * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination */ public void processActiveEnter() { //省略具体内容。。。 } /** * note * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination * 2.停止embedeCanalServer的destination */ public void processActiveExit() { //省略具体内容。。。 } /** * note * 在Canalinstance启动之前,destination注册到ZK上,创建节点 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。 * 此方法会在processActiveEnter()之前被调用 */ public void processStart() { //省略具体内容。。。 } /** * note * 在Canalinstance停止前,把ZK上节点删除掉 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。 * 此方法会在processActiveExit()之前被调用 */ public void processStop() { //省略具体内容。。。 } }); if (zkclientx != null) { runningMonitor.setZkClient(zkclientx); } // 触发创建一下cid节点 runningMonitor.init(); return runningMonitor; } }
3.2 canalController的start方法
具体运行逻辑如下:
- 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
- 先启动embededCanalServer(会启动对应的监控)
- 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
- 如果cannalServer不为空,启动canServer (canalServerWithNetty)
这里需要注意,canalServer什么时候为空?
如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。
所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。
public void start() throws Throwable { // 创建整个canal的工作节点 final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception{ logger.error("failed to connect to zookeeper", error); } }); } // 先启动embeded服务 embededCanalServer.start(); // 尝试启动一下非lazy状态的通道 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } //note:为每个instance注册一个配置监视器 if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) { //note:启动线程定时去扫描配置 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } } // 启动网络接口 if (canalServer != null) { canalServer.start(); } }
我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。
入口在runningMonitor.start()。
- 如果zkClient != null,就用zk进行HA启动
- 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
public synchronized void start() { super.start(); try { /** * note * 内部会调用ServerRunningListener的processStart()方法 */ processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { /** * note * 内部直接调用ServerRunningListener的processActiveEnter()方法 */ processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }
重点关注下HA启动方式,一般 我们都采用这种模式进行。
在集群模式下,可能会有多个canal server共同处理同一个destination,
在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。
同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!
启动的重点还是在initRuning()。
利用zk来保证集群中有且只有 一个instance任务在运行。
- 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
- 尝试创建临时节点。
- 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
- 如果创建成功,就说明没有其他server启动这个instance,可以创建
private void initRunning() { if (!isStart()) { return; } //note:还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); /** * note: * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。 * 此时会抛出ZkNodeExistsException,进入catch代码块。 */ zkClient.create(path, bytes, CreateMode.EPHEMERAL); /** * note: * 如果创建成功,就开始触发启动事件 */ activeData = serverData; processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { /** * note: * 如果捕获异常,表示创建失败。 * 就根据临时节点路径查一下是哪个canal-sever创建了。 * 如果没有相关信息,马上重新尝试一下。 * 如果确实存在,就把相关信息保存下来 */ bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { /** * note: * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。 */ zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
那运行中的HA是如何实现的呢,我们回头看一下
zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。
dataListener是在ServerRunningMonitor的构造方法中初始化的,
包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { /** * note: * 当注册节点发生变化时,会自动回调这个方法。 * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢? * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。 * 可以 触发 HA。 */ public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } /** * note: * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去 */ public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }
当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。
我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
就是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive,可以 触发 HA。
如下图所示
4.admin的配置监控原理
我们现在采用admin做全局的配置控制。
那么每个canalServer是怎么监控配置的变化呢?
还记得上吗cananlController的start方法中对配置监视器的启动吗?
if (autoScan) { //note:启动线程定时去扫描配置 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } }
这个就是关键的配置监控。
我们来看deployer模块中的monitor包了。
4.1 InstanceAction
是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。
/** * config配置变化后的动作 * * @author jianghang 2013-2-18 下午01:19:29 * @version 1.0.1 */ public interface InstanceAction { /** * 启动destination */ void start(String destination); /** * 主动释放destination运行 */ void release(String destination); /** * 停止destination */ void stop(String destination); /** * 重载destination,可能需要stop,start操作,或者只是更新下内存配置 */ void reload(String destination); }
具体实现在canalController的构造器中实现了匿名类。
4.2 InstanceConfigMonitor
这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。
我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。
原理很简单。
- 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
- 然后通过defaultAction去start
- 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
/** * 基于manager配置的实现 * * @author agapple 2019年8月26日 下午10:00:20 * @since 1.1.4 */ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle { private long scanIntervalInSecond = 5; private InstanceAction defaultAction = null; /** * note: * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction */ private Map<String, InstanceAction> actions /** * note: * 每个instance对应的远程配置 */ private Map<String, PlainCanal> configs /** * note: * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置 */ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("canal-instance-scan")); private volatile boolean isFirst = true; /** * note: * 拉取admin配置的client */ private PlainCanalConfigClient configClient; //… }
5.总结
deployer模块的主要作用:
1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。
2)确定canal-server的启动方式:独立启动或者集群方式启动
3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA
4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置
5)启动canal server,监听客户端请求
这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。