canal之高可用架构设计与应用

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: canal之高可用架构设计与应用

一 高可用架构设计



网络异常,图片无法展示
|


架构设计图


配置说明:

zookeeper x 3 + canal x 2 + mysql x 2


组件说明:

  1. linux内核版本(CentOS Linux 7):(命令:uname -a)
    Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  2. mysql版本:(SQL命令:select version(); 或 status)
    Server version: 5.6.43-log MySQL Community Server (GPL)
  3. canal版本:canal-1.1.3
  4. zookeeper版本:zookeeper-3.4.5-cdh5.7.0
  5. JDK版本: 1.8


canal工作原理:

  1. 模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal);
  3. 解析binary log对象(原始为byte流)

了解更多详细更新可以查看文章:【了解canal,看这个就够了】


二 配置与部署流程



2.1 安装mysql数据库


1. 下载安装

在192.168.175.21和192.168.175.22两台服务器上分别安装mysql,具体安装流程可参考文章:Linux-安装MySQL.


2. 创建canal账户

在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:

mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT ALL ON canal.* TO 'canal'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%';
mysql>FLUSH PRIVILEGES;

3. 验证登录

#远程登录
mysql -h 192.168.175.22 -P 3306 -u canal -pcanal
#本地登录
mysql -ucanal -pcanal

4. 修改my.cnf配置(这一步非常关键!!!)


分别在175.21和175.22两台服务器修改my.conf配置,查找my.cnf配置位置命令:whereis my.


示例,在192.168.175.21的my.cnf配置新增如下内容:

log_bin=mysql-bin  #指定bin-log的名称,尽量可以标识业务含义
binlog_format=row  #选择row模式,必须!!!
server_id=1  #mysql服务器id


2.2 搭建zookeeper集群


搭建zookeeper集群地址为192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181,具体搭建流程,可查看文章【Zookeepr3.4.5集群搭建】


2.3 搭建canal server集群


前提: mysql已打开binlog功能,且配置binlog模式为row.


1. 下载最新canal安装包

下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz


2.上传并解压

进入192.168.175.20服务器,使用rz命令上传,使用如下命令进行解压至/usr/local/hadoop/app/canal:

tar xzvf canal.deployer-1.1.3.tar.gz -C canal

3. 修改配置instance.properties

新解压的文件夹/usr/local/hadoop/app/canal/conf/有一个example文件夹,一个example就代表一个instance实例.而一个instance实例就是一个消息队列,所以这里可以将文件名改为example1,同时再复制出来一个叫example2.(命名可以使用监听的数据库名)


修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:

canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example1


修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:

canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example2


配置文件参数说明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide


4. 修改配置canal.properties

配置/usr/local/hadoop/app/canal/conf/canal.properties是一个对应canal server的全局配置(instance.properties是对应canal instance的配置)。

canal.id = 2  #保证每个canal server的id不同
canal.port = 11111
canal.zkServers =192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
#其他配置默认即可.


注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。

配置完成,将文件从192.168.175.20远程复制一份到192.168.175.22上:

#需要确保已开通免密
scp -rp /usr/local/hadoop/app/canal slave2:/usr/local/hadoop/app/

5. 启动canal server

进入文件夹/usr/local/hadoop/app/canal/bin执行如下启动命令:

./startup.sh


查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:

2019-06-07 21:15:03.372 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111]
2019-06-07 21:15:22.245 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......


在zk集群中查看canal节点注册情况:

[zk: localhost:2181(CONNECTED) 27] ls2 /otter/canal/destinations
[example2, example1]
[zk: localhost:2181(CONNECTED) 26] ls2 /otter/canal/cluster
[192.168.175.22:11111, 192.168.175.20:11111]


可以看到canal server节点已经在zk集群上注册成功.

当停掉一个canal server时,可以看到zk上对应的临时节点也会删除.


2.4 使用canal client通过zookeeper连接canal server集群


注意运行canal客户端代码时,一定要先启动canal server!!!

(1) 添加pom依赖

<!--canal-->
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.3</version>
    </dependency>


(2) canal client代码:

package com.xgh.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class TestCanalByZk {
    public static void main(String args[]) {
        String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181";
        // 创建链接
        CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example1","","");
        /*CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.22", 11111),
                "example", "", "");*/
        int batchSize = 1000;
        int emptyCount = 0;
        long batchId = 0;
        //外层死循环:在canal节点宕机后,抛出异常,等待zk对canal处理切换,切换完后,继续创建连接处理数据
        while(true) {
            try {
                connector.connect();
                connector.subscribe(".*\\..*");//订阅所有库下面的所有表
                //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
                connector.rollback();
                //内层死循环:按频率实时监听数据变化,一旦收到变化数据,立即做消费处理,并ack,考虑消费速度,可以做异步处理并ack.
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    batchId = message.getId();
                    int size = message.getEntries().size();
                    //// 偏移量不等于-1 或者 获取的数据条数不为0 时,认为拿到消息,并处理
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據
                        Thread.sleep(200); //200ms拉一次变动数据
                    } else {
                        emptyCount = 0;
                        System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
                    connector.ack(batchId); // 提交确认
                }
            }catch(Exception e){
                e.printStackTrace();
                connector.rollback(batchId); // 处理失败, 回滚数据
            } finally {
                connector.disconnect();
            }
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());
            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


canal client运行实例:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

6. 触发数据库变更

创建库:create database canal;

创建表:create table t_canal (id int,name varchar(20),status int);

插入数据:insert into t_canal values(11,'xxiao',1);

canal client输出日志:

================> binlog[mysql-bin.000001:6973] , name[canal,t_canal] , eventType : INSERT
id : 11    update=true
name : xxiao    update=true
status : 1    update=true

7. 其他

将canal client代码CanalConnectors.newClusterConnector(zkHost,"example1","","");中的队列名example1换成example2再执行,就可以监听example2对应数据变化了.


8. 问题:为何设置了数据表的过滤条件,但貌似没有生效?


:首先看文档AdminGuide,了解canal.instance.filter.regex的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 。常见例子:

  1. 所有表:.*   or  .\..
  2. canal schema下所有表: canal\..*
  3. canal下的以canal打头的表:canal\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)


检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)。

检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据

特别注意


三 运行测试及总结



1. 启动两个监听example1的canal client,启动两个监听example2的canal client:

在example1或example2对应的数据发生变化时,两个canal client只有一个消费消息。

当两个监听同一个队列的canal client有一个宕掉时,再有数据变化时,剩下的一个canal client就会开始消费数据。

这就验证了canal client的HA机制:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序.


2. 启动两个canal server并在zk上注册

当停掉其中一个canal server时,当产生数据变化时,整个canal server集群仍可以正常对外提供服务。

这就验证了canal server的HA机制:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.


3. 在canal server切换过程中,canal client存在重复消费数据的问题

这点需要在消费端自行进行处理。


参考文章:
  1. https://www.2cto.com/database/201609/547661.html
  2. https://www.cnblogs.com/yulu080808/p/8819260.html
  3. https://github.com/alibaba/canal
  4. https://blog.csdn.net/my201110lc/article/details/78836270
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
2月前
|
SQL 监控 关系型数据库
MySQL主从复制:构建高可用架构
本文深入解析MySQL主从复制原理与实战配置,涵盖复制架构、监控管理、高可用设计及性能优化,助你构建企业级数据库高可用方案。
|
3月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
689 3
|
3月前
|
运维 监控 搜索推荐
MSE ZooKeeper:Flink 高可用架构的企业级选择
本文深入解析了 Apache Flink 架构中 ZooKeeper 的核心作用,包括 Leader 选举、Checkpoint 管理、作业协调及配置管理等关键功能,并结合金融风控与电商推荐等典型场景,分析了 ZooKeeper 在实际应用中的技术实现。
|
1月前
|
运维 监控 安全
公链开发中的高可用架构设计要点
本指南提供公链高可用架构的可复用流程与模板,涵盖目标拆解、先决条件、分步执行、故障排查及验收标准,结合跨链DApp与量化机器人案例,提升落地效率与系统稳定性。
|
1月前
|
人工智能 JavaScript 前端开发
GenSX (不一样的AI应用框架)架构学习指南
GenSX 是一个基于 TypeScript 的函数式 AI 工作流框架,以“函数组合替代图编排”为核心理念。它通过纯函数组件、自动追踪与断点恢复等特性,让开发者用自然代码构建可追溯、易测试的 LLM 应用。支持多模型集成与插件化扩展,兼具灵活性与工程化优势。
201 6
|
2月前
|
人工智能 Cloud Native 中间件
划重点|云栖大会「AI 原生应用架构论坛」看点梳理
本场论坛将系统性阐述 AI 原生应用架构的新范式、演进趋势与技术突破,并分享来自真实生产环境下的一线实践经验与思考。
|
2月前
|
存储 监控 NoSQL
Redis高可用架构全解析:从主从复制到集群方案
Redis高可用确保服务持续稳定,避免单点故障导致数据丢失或业务中断。通过主从复制实现数据冗余,哨兵模式支持自动故障转移,Cluster集群则提供分布式数据分片与水平扩展,三者层层递进,保障读写分离、容灾切换与大规模数据存储,构建高性能、高可靠的Redis架构体系。
|
2月前
|
机器学习/深度学习 人工智能 vr&ar
H4H:面向AR/VR应用的NPU-CIM异构系统混合卷积-Transformer架构搜索——论文阅读
H4H是一种面向AR/VR应用的混合卷积-Transformer架构,基于NPU-CIM异构系统,通过神经架构搜索实现高效模型设计。该架构结合卷积神经网络(CNN)的局部特征提取与视觉Transformer(ViT)的全局信息处理能力,提升模型性能与效率。通过两阶段增量训练策略,缓解混合模型训练中的梯度冲突问题,并利用异构计算资源优化推理延迟与能耗。实验表明,H4H在相同准确率下显著降低延迟和功耗,为AR/VR设备上的边缘AI推理提供了高效解决方案。
405 0
|
1月前
|
机器学习/深度学习 自然语言处理 算法
48_动态架构模型:NAS在LLM中的应用
大型语言模型(LLM)在自然语言处理领域的突破性进展,很大程度上归功于其庞大的参数量和复杂的网络架构。然而,随着模型规模的不断增长,计算资源消耗、推理延迟和部署成本等问题日益凸显。如何在保持模型性能的同时,优化模型架构以提高效率,成为2025年大模型研究的核心方向之一。神经架构搜索(Neural Architecture Search, NAS)作为一种自动化的网络设计方法,正在为这一挑战提供创新性解决方案。本文将深入探讨NAS技术如何应用于LLM的架构优化,特别是在层数与维度调整方面的最新进展,并通过代码实现展示简单的NAS实验。
|
3月前
|
Web App开发 Linux 虚拟化
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件
240 0
Omnissa Horizon 8 2506 (8.16) - 虚拟桌面基础架构 (VDI) 和应用软件