系统重构数据同步利器之Canal实战篇

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生网关 MSE Higress,422元/月
简介: 系统重构数据同步利器之Canal实战篇

一、背景

二话不说,先上图


上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了。我们生产环境也用Canal监听binlog数据变更,然后解析成对应数据发送到MQ(RocketMQ)。一些非主流程业务,异步场景消费MQ处理即可。

但是我这篇文章,主要想聊一聊在做系统重构时,新老系统数据双向同步时Canal的使用场景。

注:关于系统重构的介绍我这里就不叙述了,大家可以看我之前写的系列文章:浅谈系统重构


二、关于双向同步

1.什么是双向同步?

所谓双向同步,就是老系统数据库数据往新系统数据库同步,新系统数据库同时也往老系统数据库同步。从而保证新系统,老系统数据库数据完全一致。系统重构时如果上线出现问题,随时能切回原来老系统,这也为灰度方案提供了底层保障。

2.一般同步如何做?各自优缺点是什么?


方案一: Dao层拦截方案

方案说明: 在Dao层打洞拦截所有写请求(insert,update,delete), 然后写入MQ队列,再通过消费MQ队列写入对应数据库。

优点: 这种方案实现比较简单。

缺点: 对于老系统数据库,可能有很多个服务在写入,如果从Dao层拦截,可能要修改很多地方,改动较大。


方案二: 利用Canal订阅解析Binlog

方案说明: 利用Canal订阅Binlog,解析成数据,再写入到对应数据库(这里可以直接写入,也可以先写入MQ,再消费MQ写入,推荐后者)。

优点:能够解决系统多处写入问题。

缺点:引入新的组件Canal,复杂度增加。

下面,我们就来实战操作一下方案二。


三、环境准备(Centos系统为例)

1. 安装Mysql

wget https://dev.mysql.com/get/mysql80-community-release-el8-1.noarch.rpm
yum install  mysql80-community-release-el8-1.noarch.rpm
#禁用centos自带的mysql
yum module disable mysql -y
#安装
yum install mysql-community-server -y
#启动
systemctl start mysqld
#查看启动状态 提升 Active: active (running) 表示成功
systemctl status mysqld
#查看初始密码
grep 'temporary password' /var/log/mysqld.log
#初始密码登录
mysql -uroot -p'AXXXXX'  -hlocalhost -P3306
#修改ROOT密码
ALTER USER 'root'@'localhost' IDENTIFIED BY 'BXXXXX';


2、 环境部署

1)、查看当前mysql是否开启了binlog模式, 如果log_bin的值为OFF是未开启,为ON是已开启 。

SHOW VARIABLES LIKE '%log_bin%'

2)、若未开启需要修改/ect/my.cnf 开启binlog模式

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完之后重启mysql服务

3)、创建用户并且授权

create user canal@'%' IDENTIFIED by 'XXXX';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;


3、 Canal服务端安装

1)、canal下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2)、解压到指定目录

mkdir  canal-server-1.1.4
tar -zxf canal.deployer-1.1.4.tar.gz -C canal-server-1.1.4/

3)、修改配置文件 查看主库 binlog position

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000002 |     4526 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

修改配置文件 conf/example/instance.properties

# position info
canal.instance.master.address={IP}:3306
# 这里对应上面的File
canal.instance.master.journal.name=binlog.000002
# 这里对应上面的Position
canal.instance.master.position=4526
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=XXXX

4)、启动 canal-server

./bin/startup.sh
# 查看日志
tail -f logs/example/example.log 

以上就完成了Canal-Server的单实例版本实现,生成环境集群环境一般是运维搭建,我们测试就用单实例版本。

关于Canal的HA机制设计下面简单介绍下,生产环境推荐使用。

canal的HA分为两部分,canal server和canal client分别有对应的HA实现


canal server:

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.


canal client:

为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),这里就不展开介绍了,有兴趣的同学可以看下官方wiki。



四、演示环节

由于canal组件封装的代码太多,我花了几个晚上业余时间写的(请点个赞吧),代码已经开源至gitee,有需要的同学可以clone下来看。

gitee地址: https://gitee.com/bytearch/fast-cloud

目前已支持simple直连模式和zookeeper集群模式

下面演示canal-client-demo

1.新建库order_center ,并且创建表order_info

CREATE TABLE `order_info` (
  `order_id` bigint(20) unsigned NOT NULL,
  `user_id` int(11) DEFAULT '0' COMMENT '用户id',
  `status` int(11) DEFAULT '0' COMMENT '订单状态',
  `booking_date` datetime DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`order_id`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_bdate` (`booking_date`),
  KEY `idx_ctime` (`create_time`),
  KEY `idx_utime` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.添加处理器handler

@CanalHandler(value = "orderInfoHandler", destination = "example", schema = {"order_center"}, table = {"order_info"}, eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE})
public class OrderHandler implements Handler<CanalEntryBO> {
    @Override
    public boolean beforeHandle(CanalEntryBO canalEntryBO) {
        if (canalEntryBO == null) {
            return false;
        }
        return true;
    }
    @Override
    public void handle(CanalEntryBO canalEntryBO) {
        //1. 更新后数据解析
        OrderInfoDTO orderInfoDTO = CanalAnalysisUti.analysis(OrderInfoDTO.class, canalEntryBO.getRowData().getAfterColumnsList());
        System.out.println("event:" + canalEntryBO.getEventType());
        System.out.println(orderInfoDTO);
        //2. 后续操作 TODO
    }
}

添加配置

canal:
  clients:
    simpleInstance:
      enable: true
      mode: simple
      servers: XXXXX:11111
      batchSize: 1000
      destination: example
      getMessageTimeOutMS: 500
    #zkInstance:
    #   enable: true
    #   mode: zookeeper
    #   servers: 172.30.1.6:2181,172.30.1.7:2181,172.30.1.8:2181
    #   batchSize: 1000
    #   #filter: order_center.order_info
    #   destination: example
    #   getMessageTimeOutMS: 500

配置说明:

public class CanalProperties {
    /**
     * 是否开启 默认不开启
     */
    private boolean enable = false;
    /**
     * 模式
     * zookeeper: zk集群模式
     * simple: 简单直连模式
     */
    private String mode = "simple";
    /**
     * canal-server地址 多个地址逗号隔开
     */
    private String servers;
    /**
     * canal-server 的destination
     */
    private String destination;
    private String username = "";
    private String password = "";
    private int batchSize = 5 * 1024;
    private String filter = StringUtils.EMPTY;
    /**
     * getMessage & handleMessage 的重试次数, 最后一次重试会ack, 之前的重试会rollback
     */
    private int retries = 3;
    /**
     * getMessage & handleMessage 的重试间隔ms
     * canal-client内部代码 的重试间隔ms
     */
    private int retryInterval = 3000;
    private long getMessageTimeOutMS = 1000;

3.测试insert和update操作

mysql> insert into order_info(order_id,user_id,status,booking_date,create_time,update_time) values(6666666,6,10,"2022-02-19 00:00:00","2022-02-19 00:00:00", "2022-02-19 00:00:00");
Query OK, 1 row affected (0.00 sec)
mysql> update order_info set status=20 where order_id=66666;
Query OK, 0 rows affected (0.00 sec)
Rows matched: 0  Changed: 0  Warnings: 0
mysql> 

4.测试结果

2022-02-18 19:29:52.399  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
****************************************************
* Batch Id: [11] ,count : [3] , memsize : [189] , Time : 2022-02-18 19:29:52.399
* Start : [binlog.000003:18893:1645183792000(2022-02-18 19:29:52.000)] 
* End : [binlog.000003:19123:1645183792000(2022-02-18 19:29:52.000)] 
****************************************************
2022-02-18 19:29:52.405  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
----------------> binlog[binlog.000003:19056] , name[order_center,order_info] , eventType : INSERT ,tableName : order_info, executeTime : 1645183792000 , delay : 400ms
event:INSERT
OrderInfoDTO{orderId=6666666, userId=6, status=10, bookingDate=2022-02-19 00:00:00, createTime=2022-02-19 00:00:00, updateTime=2022-02-19 00:00:00}
  1. 大功告成,到这一步就顺利完成了Canal订阅解析binlog步骤。


五、数据同步注意事项

抛下两个问题大家可以思考下

  1. 1.数据双向同步时,如何解决数据回环问题?
     例如新系统产生的数据,同步到老系统,不能又回流到新系统,如何解决?
  2. 2.数据顺序问题,如果写入到MQ,是否要保证顺序消费?如何实现?
  3. 3.当同步并发比较大,如何提高同步速度。

温馨提示: 此专题未完,以上问题我将在下一篇文章《系统重构数据同步利器之Canal实战篇-续》实现,大家可以提前思考一下。


六、 号外

欢迎大家关注,不定期分享原创文章

有任何问题,欢迎私信我交流。

相关文章
|
4月前
|
供应链 中间件
SAP CRM 和 ERP 系统之间的主数据同步 - PRODUCT_R3_ADAPTER
SAP CRM 和 ERP 系统之间的主数据同步 - PRODUCT_R3_ADAPTER
|
15天前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
103 4
|
16天前
|
存储 关系型数据库 MySQL
【TiDB原理与实战详解】5、BR 物理备份恢复与Binlog 数据同步~学不会? 不存在的!
BR(Backup & Restore)是 TiDB 分布式备份恢复的命令行工具,适用于大数据量场景,支持常规备份恢复及大规模数据迁移。BR 通过向各 TiKV 节点下发命令执行备份或恢复操作,生成 SST 文件存储数据信息与 `backupmeta` 文件存储元信息。推荐部署配置包括在 PD 节点部署 BR 工具,使用万兆网卡等。本文介绍 BR 的工作原理、部署配置、使用限制及多种备份恢复方式,如全量备份、单库/单表备份、过滤备份及增量备份等。
|
16天前
|
关系型数据库 MySQL 调度
【TiDB原理与实战详解】4、DM 迁移和TiCDC数据同步~学不会? 不存在的!
TiDB Data Migration (DM) 和 TiCDC 是两款用于数据库迁移和同步的强大工具。DM 支持将兼容 MySQL 协议的数据库(如 MySQL、MariaDB)的数据异步迁移到 TiDB 中,具备全量和增量数据传输能力,并能合并分库分表的数据。TiCDC 则专注于 TiDB 的增量同步,利用 TiKV 日志实现高可用性和水平扩展,支持多种下游系统和输出格式。两者均可通过 TiUP 工具进行部署与管理,简化了集群的安装、配置及任务管理过程。
|
28天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
229 0
|
2月前
|
运维 DataWorks 安全
DataWorks产品使用合集之在进行数据同步时,如何处理源系统表名不固定的情况
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
canal 关系型数据库 MySQL
Canal数据同步工具
Canal数据同步工具
125 2
|
10月前
|
消息中间件 NoSQL 关系型数据库
Canal+Kafka实现MySQL与Redis数据同步(二)
Canal+Kafka实现MySQL与Redis数据同步
192 0
|
10月前
|
消息中间件 canal NoSQL
Canal+Kafka实现MySQL与Redis数据同步(一)
Canal+Kafka实现MySQL与Redis数据同步
426 0
|
28天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

热门文章

最新文章