详解 canal 同步 MySQL 增量数据到 ES

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。这篇文章,我们手把手向同学们展示**使用 canal 将 MySQL 增量数据同步到 ES**

canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES

1 集群模式

图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。

server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务

instance 包含如下模块 :

  • eventParser

    数据源接入,模拟 slave 协议和 master 进行交互,协议解析

  • eventSink

    Parser 和 Store 链接器,进行数据过滤,加工,分发的工作

  • eventStore

    数据存储

  • metaManager

    增量订阅 & 消费信息管理器

真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式MQ 模式

实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

顺序消费

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

2 MySQL配置

1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、创建数据库商品表 t_product

CREATE TABLE `t_product` (
    `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
    `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
    `price` DECIMAL ( 10, 2 ) NOT NULL,
    `status` TINYINT ( 4 ) NOT NULL,
    `create_time` datetime NOT NULL,
    `update_time` datetime NOT NULL,
   PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用 Kibana 创建商品索引

PUT /t_product
{
   
   
    "settings": {
   
   
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mappings": {
   
   
            "properties": {
   
   
               "id": {
   
   
                    "type":"keyword"
                },
                "name": {
   
   
                    "type":"text"
                },
                "price": {
   
   
                    "type":"double"
                },
                "status": {
   
   
                    "type":"integer"
                },
                "createTime": {
   
   
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
   
   
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

执行完成,如图所示 :

4 RocketMQ 配置

创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。

5 canal 配置

我们选取 canal 版本 1.1.6 ,进入 conf 目录。

1、配置 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默认值 展示出来 
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true

2、instance 配置文件

conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

3、服务启动

启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。

6 消费者

1、产品索引操作服务

2、消费监听器

消费者逻辑重点有两点:

  • 顺序消费监听器
  • 将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATEINSERTDELETE 执行产品索引操作服务的方法。

7 写到最后

canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。

推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。

这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。

https://github.com/makemyownlife/rocketmq4-learning


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
9天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
65 0
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
481 4
|
24天前
|
SQL 存储 关系型数据库
Mysql主从同步 清理二进制日志的技巧
Mysql主从同步 清理二进制日志的技巧
20 1
|
2月前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
209 11
|
18天前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
78 6
|
16天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
50 3
Mysql(4)—数据库索引
|
18天前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
57 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
1天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
14 2
|
4天前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
19 4
|
9天前
|
存储 关系型数据库 MySQL
如何在MySQL中创建数据库?
【10月更文挑战第16天】如何在MySQL中创建数据库?