canal针对分库分表场景的高可用架构设计与应用

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS AI 助手,专业版
简介: canal针对分库分表场景的高可用架构设计与应用

一 架构设计



网络异常,图片无法展示
|


简单架构设计


说明:

  1. 两个mysql库中均创建有canal/canal的账户;
  2. 这里A、B两个mysql库是用来模拟t_dept进行分库分表;
  3. 另外,在A、B两种表中都创建有表t_canal.
  4. canal原理: 可查看文章【了解canal,看这个就够了】
  5. 安装与搭建流程:可参考文章【canal应用-1个server+2个instance+2个client+2个mysql】


处理分表分库的场景,主要是要使用配置group-instance.xml。group-instance主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。


比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可。


二 关键实现流程


2.1 canal.properties配置文件


canal.properties是对应一个canal server的全局配置,保存位置:/usr/local/hadoop/app/canal_group/conf/canal.properties。配置修改内容如下:


canal.id = 1 #唯一标识
canal.ip =192.168.175.20 # client访问canal server的ip地址
canal.port = 11111  # client访问canal server的端口
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml   #原来是这个
canal.instance.global.spring.xml = classpath:spring/group-instance.xml  #启动这个
#其他配置保持默认即可.


2.2 instance.properties配置文件


使用如下命令复制出两个代表canal instance的文件夹:


cp -R example t_dept;
cp -R example t_canal;
rm -rf example;


调整配置文件/usr/local/hadoop/app/canal/conf/t_dept/instance.properties如下:


#canal.instance.master.address=192.168.175.21:3306 #原来的
canal.instance.master1.address=192.168.175.21:3306 #新增,与group-instance.xml的对应
canal.instance.master2.address=192.168.175.22:3306 #新增,与group-instance.xml的对应
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# mq config
canal.mq.topic=t_dept


调整配置文件/usr/local/hadoop/app/canal/conf/t_canal/instance.properties如下:


#canal.instance.master.address=192.168.175.21:3306 #原来的
canal.instance.master1.address=192.168.175.21:3306 #新增,与group-instance.xml的对应
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# mq config
canal.mq.topic=t_canal


2.3 group-instance.xml配置文件

配置文件/usr/local/hadoop/app/canal/conf/spring/group-instance.xml不需要做调整。


2.4 启动canal server

进入文件夹/usr/local/hadoop/app/canal/bin执行如下启动命令:


./startup.sh


查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:


2019-06-07 21:15:03.372 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.20:11111]
2019-06-07 21:15:22.245 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......


2.5 使用canal client连接canal server


注意运行canal客户端代码时,一定要先启动canal server!!!

(1) 添加pom依赖


<!--canal-->
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.3</version>
    </dependency>


(2) canal client代码:


package com.xgh.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClientGroupTest1 {
    public static void main(String args[]) {
        //String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181";
        // 创建链接
        //CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example","","");
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.21", 11111),
                "t_dept", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        long batchId = 0;
        //外层死循环:在canal节点宕机后,抛出异常,等待zk对canal处理切换,切换完后,继续创建连接处理数据
        while(true) {
            try {
                connector.connect();
                connector.subscribe(".*\\..*");//订阅所有库下面的所有表
                //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
                connector.rollback();
                //内层死循环:按频率实时监听数据变化,一旦收到变化数据,立即做消费处理,并ack,考虑消费速度,可以做异步处理并ack.
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    batchId = message.getId();
                    int size = message.getEntries().size();
                    //// 偏移量不等于-1 或者 获取的数据条数不为0 时,认为拿到消息,并处理
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據
                        Thread.sleep(1000); //1000ms拉一次变动数据
                    } else {
                        emptyCount = 0;
                        System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                    //
                }
            }catch(Exception e){
                e.printStackTrace();
                connector.rollback(batchId); // 处理失败, 回滚数据
            } finally {
                connector.disconnect();
            }
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());
            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


2.6 其他


将canal client代码CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.21", 11111), "t_dept", "", "");中的队列名t_dept换成t_canal再执行,就可以监听t_canal对应数据变化了.


三 运行测试及总结



1. 监听t_dept的canal client可以接收到数据库A和B的数据变化
2. 监听t_canal的canal client只能接收到数据库B的数据变化
3. 数据过滤的设置问题


当在instance.properties和canal client中对设置filter时,canal client的设置会覆盖instance.properties中的配置。所以不如干脆保持instance.properties为默认状态,也即是不过滤,然后过滤全部设置在canal client中,如下:


connector.connect();
connector.subscribe(".*\\..*");//订阅所有库下面的所有表
//connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal


四 高可用架构设计



网络异常,图片无法展示
|


高可用架构设计


参考文章:
  1. https://www.cnblogs.com/yulu080808/p/8819260.html
  2. https://github.com/alibaba/canal
  3. https://blog.csdn.net/my201110lc/article/details/78836270
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
7月前
|
SQL 监控 关系型数据库
MySQL主从复制:构建高可用架构
本文深入解析MySQL主从复制原理与实战配置,涵盖复制架构、监控管理、高可用设计及性能优化,助你构建企业级数据库高可用方案。
|
8月前
|
运维 监控 搜索推荐
MSE ZooKeeper:Flink 高可用架构的企业级选择
本文深入解析了 Apache Flink 架构中 ZooKeeper 的核心作用,包括 Leader 选举、Checkpoint 管理、作业协调及配置管理等关键功能,并结合金融风控与电商推荐等典型场景,分析了 ZooKeeper 在实际应用中的技术实现。
|
6月前
|
运维 监控 安全
公链开发中的高可用架构设计要点
本指南提供公链高可用架构的可复用流程与模板,涵盖目标拆解、先决条件、分步执行、故障排查及验收标准,结合跨链DApp与量化机器人案例,提升落地效率与系统稳定性。
|
6月前
|
人工智能 JavaScript 前端开发
GenSX (不一样的AI应用框架)架构学习指南
GenSX 是一个基于 TypeScript 的函数式 AI 工作流框架,以“函数组合替代图编排”为核心理念。它通过纯函数组件、自动追踪与断点恢复等特性,让开发者用自然代码构建可追溯、易测试的 LLM 应用。支持多模型集成与插件化扩展,兼具灵活性与工程化优势。
533 6
|
7月前
|
人工智能 Cloud Native 中间件
划重点|云栖大会「AI 原生应用架构论坛」看点梳理
本场论坛将系统性阐述 AI 原生应用架构的新范式、演进趋势与技术突破,并分享来自真实生产环境下的一线实践经验与思考。
|
7月前
|
存储 监控 NoSQL
Redis高可用架构全解析:从主从复制到集群方案
Redis高可用确保服务持续稳定,避免单点故障导致数据丢失或业务中断。通过主从复制实现数据冗余,哨兵模式支持自动故障转移,Cluster集群则提供分布式数据分片与水平扩展,三者层层递进,保障读写分离、容灾切换与大规模数据存储,构建高性能、高可靠的Redis架构体系。
|
7月前
|
机器学习/深度学习 人工智能 vr&ar
H4H:面向AR/VR应用的NPU-CIM异构系统混合卷积-Transformer架构搜索——论文阅读
H4H是一种面向AR/VR应用的混合卷积-Transformer架构,基于NPU-CIM异构系统,通过神经架构搜索实现高效模型设计。该架构结合卷积神经网络(CNN)的局部特征提取与视觉Transformer(ViT)的全局信息处理能力,提升模型性能与效率。通过两阶段增量训练策略,缓解混合模型训练中的梯度冲突问题,并利用异构计算资源优化推理延迟与能耗。实验表明,H4H在相同准确率下显著降低延迟和功耗,为AR/VR设备上的边缘AI推理提供了高效解决方案。
1354 0
|
6月前
|
机器学习/深度学习 自然语言处理 算法
48_动态架构模型:NAS在LLM中的应用
大型语言模型(LLM)在自然语言处理领域的突破性进展,很大程度上归功于其庞大的参数量和复杂的网络架构。然而,随着模型规模的不断增长,计算资源消耗、推理延迟和部署成本等问题日益凸显。如何在保持模型性能的同时,优化模型架构以提高效率,成为2025年大模型研究的核心方向之一。神经架构搜索(Neural Architecture Search, NAS)作为一种自动化的网络设计方法,正在为这一挑战提供创新性解决方案。本文将深入探讨NAS技术如何应用于LLM的架构优化,特别是在层数与维度调整方面的最新进展,并通过代码实现展示简单的NAS实验。
337 0
|
8月前
|
Web App开发 Linux 虚拟化
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件
447 0
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件
|
8月前
|
机器学习/深度学习 数据采集 存储
技术赋能下的能源智慧管理:MyEMS 开源系统的架构创新与应用深化
在全球能源转型与“双碳”战略推动下,MyEMS作为基于Python的开源能源管理系统,凭借模块化架构与AI技术,助力重点用能单位实现数字化、智能化能源管理。系统支持多源数据采集、智能分析、设备数字孪生与自适应优化控制,全面满足国家级能耗监测要求,并已在制造、数据中心、公共建筑等领域成功应用,助力节能降碳,推动绿色可持续发展。
257 0