Alibaba Canal Manager Model 配置管理实现

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 [alibaba/canal](https://github.com/alibaba/canal)。 其中 Server 端配置有两种管理方式: Spring 和 Manager。

Alibaba Canal Manager Model 配置管理实现

Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 alibaba/canal
其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。

源码入口

版本:canal-1.0.24
查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。

配置 Manager 方式

要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置

# 配置方式
canal.instance.global.mode=manager
# 是否开启自动扫描
canal.auto.scan=true
# 自动扫描间隔,单位秒
canal.auto.scan.interval=5
# 全局的manager配置方式的链接信息,用于标识该 Server
canal.instance.global.manager.address = 127.0.0.1:1099

这里需要简单说明几个概念
Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;
Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;
Destination: 字符串类型,对应一个 Instance;

CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;
CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;
CanalConfigClient 类: 存放配置相关信息;
CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;

其 Server 架构如下图
Server
启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。

Manager 实现

Canal-Server 配置加载方式
加载方式

配置初始化

查看 CanalController 类
在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。

public CanalController(final Properties properties) {
        managerClients = MigrateMap.makeComputingMap(new Function<String, CanalConfigClient>() {

            public CanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });

        // 初始化全局参数设置
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config
        initInstanceConfig(properties);

        // 准备canal server
        cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
        ip = getProperty(properties, CanalConstants.CANAL_IP);
        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        canalServer = CanalServerWithNetty.instance();
        canalServer.setIp(ip);
        canalServer.setPort(port);
        ......
}

查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中

instanceGenerator = new CanalInstanceGenerator() {

    public CanalInstance generate(String destination) {
        InstanceConfig config = instanceConfigs.get(destination);
        if (config == null) {
            throw new CanalServerException("can't find destination:{}");
        }

        logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode());
        if (config.getMode().isManager()) {
            ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();
            instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
            return instanceGenerator.generate(destination);
        } else if (config.getMode().isSpring()) {
            SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
            synchronized (this) {
                try {
                    // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);
                    instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
                    return instanceGenerator.generate(destination);
                } catch (Throwable e) {
                    logger.error("generator instance failed.", e);
                    throw new CanalException(e);
                } finally {
                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");
                }
            }
        } else {
            throw new UnsupportedOperationException("unknow mode :" + config.getMode());
        }

    }
};

可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。

实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。

        Canal canal = new Canal();

        CanalParameter canalParameter = new CanalParameter();
        canalParameter.setSlaveId(dc.getSlaveId());
        canalParameter.setDbUsername(dc.getUsername());
        canalParameter.setDbPassword(dc.getPassword());
        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        List<InetSocketAddress> dbAddresses = new ArrayList<>();
        dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort()));
        canalParameter.setDbAddresses(dbAddresses);

        canal.setCanalParameter(canalParameter);
        canal.setName(destination);

到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。

配置刷新

CanalContoller 构造器中

......
    // 初始化monitor机制
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {

                public void start(String destination) {
                    InstanceConfig config = instanceConfigs.get(destination);
                    if (config == null) {
                        // 重新读取一下instance config
                        config = parseInstanceConfig(properties, destination);
                        instanceConfigs.put(destination, config);
                    }

                    if (!embededCanalServer.isStart(destination)) {
                        // HA机制启动
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (!config.getLazy() && !runningMonitor.isStart()) {
                            runningMonitor.start();
                        }
                    }
                }

                public void stop(String destination) {
                    // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
                    InstanceConfig config = instanceConfigs.remove(destination);
                    if (config != null) {
                        embededCanalServer.stop(destination);
                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                        if (runningMonitor.isStart()) {
                            runningMonitor.stop();
                        }
                    }
                }

                public void reload(String destination) {
                    // 目前任何配置变化,直接重启,简单处理
                    stop(destination);
                    start(destination);
                }
            };

            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {

                public InstanceConfigMonitor apply(InstanceMode mode) {
                    int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));

                    if (mode.isSpring()) {
                        SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        // 设置conf目录,默认是user.dir + conf目录组成
                        String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
                        if (StringUtils.isEmpty(rootDir)) {
                            rootDir = "../conf";
                        }

                        if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
                            monitor.setRootConf(rootDir);
                        } else {
                            // eclipse debug模式
                            monitor.setRootConf("src/main/resources/");
                        }
                        return monitor;
                    } else if (mode.isManager()) {
                        // 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可
                        ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
                        monitor.setScanIntervalInSecond(scanInterval);
                        monitor.setDefaultAction(defaultAction);
                        monitor.setIp(ip);
                        return monitor;
                    } else {
                        throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
                    }
                }
            });
        }
.....

查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。

总结

Manager 模式实现步骤

1. 在 canal.properties 配置文件设置 `canal.instance.global.mode=manager`等;
2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);
3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,
配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。
目录
相关文章
|
4月前
|
人工智能 IDE 开发工具
AI编程之手把手教你安装Qoder
Qoder是一款基于VSCode开发的AI编程工具,安装简单,界面熟悉。注册账号后下载对应系统版本,按提示完成安装即可。适合开发者快速上手,配合教程视频更易掌握。
2596 0
|
10月前
|
编解码 网络协议 算法
SpringBoot × TCP 极速开发指南:工业级TCP通信协议栈操作手册
🌟 ​大家好,我是摘星!​ 🌟今天为大家带来的是并发编程的SpringBoot × TCP 极速开发指南,废话不多说直接开始~
686 0
|
6月前
|
存储 NoSQL Java
配置RedisTemplate序列化机制
通过上述步骤,你可以灵活配置RedisTemplate的序列化机制,根据应用需求选择合适的序列化器,从而确保数据在Redis中的存储和读取效率最优化。配置合适的序列化机制对于性能和存储效率至关重要,而且这样可以确保数据在存储和传输过程中的结构清晰和一致性。
357 11
|
资源调度 运维 前端开发
超强开源全能日程助手—揭秘FullCalendar
超强开源全能日程助手—揭秘FullCalendar
1141 3
|
Cloud Native Java Nacos
Consul 留给你的时间不多了
Consul 留给你的时间不多了
638 88
|
关系型数据库 MySQL API
flinkcdc不做任何处理,直接mysql同步到 mysql 的过程中 sink 使用 哪个方法?
flinkcdc不做任何处理,直接mysql同步到 mysql 的过程中 sink 使用 哪个方法?
595 1
|
Java 编译器 Spring
Spring AOP 和 AspectJ 的区别
Spring AOP和AspectJ AOP都是面向切面编程(AOP)的实现,但它们在实现方式、灵活性、依赖性、性能和使用场景等方面存在显著区别。‌
683 2
|
JSON 安全 前端开发
跨域详解及Spring Boot 3中的跨域解决方案
本文介绍了Web开发中的跨域问题,包括概念、原因、影响以及在Spring Boot 3中的解决方案。跨域是由浏览器的同源策略限制引起的,阻碍了不同源之间的数据传输。解决方法包括CORS、JSONP和代理服务器。在Spring Boot 3中,可以通过配置CorsFilter来允许跨域请求,实现前后端分离项目的正常运行。
1233 3
 跨域详解及Spring Boot 3中的跨域解决方案
|
JSON Java Maven
实现Java Spring Boot FCM推送教程
详细介绍实现Java Spring Boot FCM推送教程
503 0
|
消息中间件 编解码 网络协议
京东面试 rockmq是推消息还是拉消息?他的消息模型是啥?
RocketMQ采用拉模式结合长轮询模拟推效果,减少延迟并优化资源使用。在长轮询中,服务器在无消息时保持请求开放,待有新消息时立即响应,提升实时性。利用Netty的TCP连接和异步处理,RocketMQ构建高效通信协议,适应不同吞吐量和实时性需求场景,兼顾控制与实时响应。
396 0
京东面试 rockmq是推消息还是拉消息?他的消息模型是啥?