Alibaba Canal Manager Model 配置管理实现

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 [alibaba/canal](https://github.com/alibaba/canal)。 其中 Server 端配置有两种管理方式: Spring 和 Manager。

Alibaba Canal Manager Model 配置管理实现

Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 alibaba/canal
其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。

源码入口

版本:canal-1.0.24
查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。

配置 Manager 方式

要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置

# 配置方式
canal.instance.global.mode=manager
# 是否开启自动扫描
canal.auto.scan=true
# 自动扫描间隔,单位秒
canal.auto.scan.interval=5
# 全局的manager配置方式的链接信息,用于标识该 Server
canal.instance.global.manager.address = 127.0.0.1:1099

这里需要简单说明几个概念
Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;
Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;
Destination: 字符串类型,对应一个 Instance;

CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;
CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;
CanalConfigClient 类: 存放配置相关信息;
CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;

其 Server 架构如下图
Server
启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。

Manager 实现

Canal-Server 配置加载方式
加载方式

配置初始化

查看 CanalController 类
在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。

public CanalController(final Properties properties) {
        managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {

            public CanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });

        // 初始化全局参数设置
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config
        initInstanceConfig(properties);

        // 准备canal server
        cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        ip = getProperty(properties, CanalConstants.CANAL_IP);
        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        canalServer = CanalServerWithNetty.instance();
        canalServer.setIp(ip);
        canalServer.setPort(port);
        ......
}

查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中

instanceGenerator = new CanalInstanceGenerator() {

    public CanalInstance generate(String destination) {
        InstanceConfig config = instanceConfigs.get(destination);
        if (config == null) {
            throw new CanalServerException("can't find destination:{}");
        }

        logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode());
        if (config.getMode().isManager()) {
            ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
            instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
            return instanceGenerator.generate(destination);
        } else if (config.getMode().isSpring()) {
            SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
            synchronized (this) {
                try {
                    // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                    instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                    return instanceGenerator.generate(destination);
                } catch (Throwable e) {
                    logger.error("generator instance failed.", e);
                    throw new CanalException(e);
                } finally {
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                }
            }
        } else {
            throw new UnsupportedOperationException("unknow mode :" + config.getMode());
        }

    }
};

可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。

实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。

        Canal canal = new Canal();

        CanalParameter canalParameter = new CanalParameter();
        canalParameter.setSlaveId(dc.getSlaveId());
        canalParameter.setDbUsername(dc.getUsername());
        canalParameter.setDbPassword(dc.getPassword());
        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        List<InetSocketAddress> dbAddresses = new ArrayList<>();
        dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort()));
        canalParameter.setDbAddresses(dbAddresses);

        canal.setCanalParameter(canalParameter);
        canal.setName(destination);

到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。

配置刷新

CanalContoller 构造器中

......
    // 初始化monitor机制
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {

                public void start(String destination) {
                    InstanceConfig config = instanceConfigs.get(destination);
                    if (config == null) {
                        // 重新读取一下instance config
                        config = parseInstanceConfig(properties, destination);
                        instanceConfigs.put(destination, config);
                    }

                    if (!embededCanalServer.isStart(destination)) {
                        // HA机制启动
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (!config.getLazy() && !runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                }

                public void stop(String destination) {
                    // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                    InstanceConfig config = instanceConfigs.remove(destination);
                    if (config != null) {
                        embededCanalServer.stop(destination);
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (runningMonitor.isStart()) {
                            runningMonitor.stop();
                        }
                    }
                }

                public void reload(String destination) {
                    // 目前任何配置变化,直接重启,简单处理
                    stop(destination);
                    start(destination);
                }
            };

            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {

                public InstanceConfigMonitor apply(InstanceMode mode) {
                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));

                    if (mode.isSpring()) {
                        SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        // 设置conf目录,默认是user.dir + conf目录组成
                        String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                        if (StringUtils.isEmpty(rootDir)) {
                            rootDir = "../conf";
                        }

                        if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
                            monitor.setRootConf(rootDir);
                        } else {
                            // eclipse debug模式
                            monitor.setRootConf("src/main/resources/");
                        }
                        return monitor;
                    } else if (mode.isManager()) {
                        // 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可
                        ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        monitor.setIp(ip);
                        return monitor;
                    } else {
                        throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                    }
                }
            });
        }
.....

查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。

总结

Manager 模式实现步骤

1. 在 canal.properties 配置文件设置 `canal.instance.global.mode=manager`等;
2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);
3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,
配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。
目录
相关文章
|
Nacos
Nacos源码构建报错程序包不存在com.alibaba.nacos.consistency.entity
Nacos源码构建报错程序包不存在com.alibaba.nacos.consistency.entity
461 0
Nacos源码构建报错程序包不存在com.alibaba.nacos.consistency.entity
|
3月前
|
API
【Azure Cloud Service】Cloud Service(Classic) 迁移失败,找不到解决方案怎么办?
【Azure Cloud Service】Cloud Service(Classic) 迁移失败,找不到解决方案怎么办?
|
1月前
|
JSON API 数据安全/隐私保护
【Azure Cloud Service】使用RESTAPI更新Cloud Service(Extended Support) 中所配置的证书
本文介绍了在更新Azure Cloud Service (Extended Support) 证书时,若旧证书(如中间证书、根证书)存储在Key Vault Secret中,而新证书仅匹配到服务器证书时,可能导致的错误及解决方法。建议使用PowerShell或RestAPI进行涉及机密的更新。文章详细描述了使用REST API更新证书的三个步骤:上传证书到Azure Key Vault、获取Cloud Service信息并发送GET请求、更新Cloud Service信息并发送PUT请求。通过这些步骤,可以成功更新证书并在云服务节点中验证证书信息。
|
3月前
|
API 网络架构
【Azure Cloud Service(Extended Support)】如何使用外延服务迁移应用?
【Azure Cloud Service(Extended Support)】如何使用外延服务迁移应用?
|
3月前
|
Java Spring
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
|
3月前
|
存储 Kubernetes 关系型数据库
Kubernetes(K8S) 安装Nacos,报 No DataSource set
Kubernetes(K8S) 安装Nacos,报 No DataSource set
46 0
|
6月前
|
Java Nacos Docker
Spring Cloud Alibaba【什么是Nacos、Nacos Server下载安装 、Docker安装Nacos Server服务、微服务聚合父工程构建】(一)
Spring Cloud Alibaba【什么是Nacos、Nacos Server下载安装 、Docker安装Nacos Server服务、微服务聚合父工程构建】(一)
192 0
|
消息中间件 监控 应用服务中间件
EDAS(Enterprise Distributed Application Service)
EDAS(Enterprise Distributed Application Service)是阿里云提供的一款企业级分布式应用服务,旨在为企业提供高可用、高性能、高可扩展性的分布式应用服务,支持多种应用程序的开发、部署、运行和管理。EDAS包含了多个子产品,如应用服务、容器服务、消息队列等,可以满足企业在分布式应用场景下的各种需求。
220 0
|
监控 Cloud Native Java
EDAS(Enterprise Distributed Application Service
EDAS(Enterprise Distributed Application Service)是阿里云推出的一款云原生应用管理平台,可以帮助企业快速构建、部署和管理云原生应用。EDAS提供了一系列的服务,包括应用部署、配置管理、监控告警、日志查询、分布式跟踪等,可以帮助企业在云上快速构建稳定、高效、安全的应用系统。
312 0
|
Java Nacos 数据中心
Spring Cloud Alibaba - 05 Nacos 领域模型_NameSpac/Group/Cluster
Spring Cloud Alibaba - 05 Nacos 领域模型_NameSpac/Group/Cluster
116 0