手把手告诉你如何监听 MySQL binlog 实现数据变化后的实时通知!

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hello 大家好,我是阿粉。不知道大家在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。

Hello 大家好,我是阿粉。不知道大家在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。

一般遇到这种情况下,在实时性要求不高的场景我们有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。

那有没有一种比较好的方式可以解决这个问题呢?答案当然是肯定的。今天就给大家介绍一下 Canal,基于 MySQLbin log 日志来实时监听数据变化。

什么是 Canal

官方的解释是:canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

通过官方的解释我们看到,是针对 MySQL 数据库增量日志解析的,MySQL 的日志是通过 bin log 的形式存储的二进制文件,提供数据订阅和消费就是说提供对二进制文件数据的监听。当日志数据发生变化的时候就会被监听到,从而程序就可以实时获取到有变化的数据。拿到变化的数据后就可以更新进缓存,ES 或发送到消息队列中通知下游服务了。

原理

6.jpg

上面介绍了 Canal 的基本概念,现在我们看看 Canal是怎么实现的,我们都知道 MySQL 是支持主从同步的,而且 Slave 也是通过 bin log 日志的形式同步 master 实例数据的。所以 Canal 就巧妙的运用了这个原理,把自己模拟成一个 Slave,给 MySQLmaster 发送 dump  协议,当 master 接受到 dump 协议的时候就以为 Canal 是一个 Slave 就会推送 bin logCanal

使用方式

开启 MySQL 的binlog

MySQL 的安装阿粉这里就不演示了,网上的文章一大把,大家可以自己去研究安装,要是 macOS 的话,终端里面输入brew install mysql 坐等搞定。

安装完成过后我们看下是否开启了 bin log ,如果没有开启则修改 my.cnf 增加 log-bin=mysql-bin  即可开启。输入命令mysql> show variables like 'log_bin'; 从图中我们可以看到阿粉这里是开启了 bin log 日志的。

7.jpg

接下来我们创建一个 canal 的账号,用于 canal 使用。我们创建一个 canal的账号,同时密码也是 canal

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

安装 Canal

这里我们安装 1.1.5 的版本,可以直接 wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 也可以在 GitHub 上面直接下载。

8.jpg

下载完解压后目录如图,我们需要修改配置文件,将账号密码以及 bin log 文件名配上

9.jpg

配置完成过后,通过 bin 目录下的脚本进行启动,并且通过日志我们可以看到启动成功。

10.jpg

服务端启动成功后,我们就需要使用客户端去获取数据了,这里我们可以参考 CanalGitHub 官网中提供的 example 样例去进行模拟。

这里有个坑大家要注意下,如果 MySQL 的版本是 8.0 以下应该没有这个问题,如果是 8.0 版本的,我们通过查看tail -f example.log 日志会发现如下异常Caused by: java.io.IOException: caching_sha2_password Auth failed

11.jpg

阿粉这里就遇到了,经过在官方 GitHub 上面的 issue 中,如果搜索到相关的错误信息 https://github.com/alibaba/canal/issues/1700,里面有大佬给了解决方案,在 MySQL 中执行如下命令即可解决

ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;

如果没有遇到这个问题的小伙伴就可以直接忽略,接下来我们通过官方源码中的 example 示例来测试功能。把源码下载下来后找到com.alibaba.otter.canal.example.SimpleCanalClientTest 类,正常来说不需要修改什么内容,如果密码有变化的话这里可以调整,然后直接运行 main 函数即可。这个时候 MySQL,Canal,以及我们的测试类都已经启动了,下面通过执行 SQL 来创建数据库和表以及插入相应的数据,观察控制台的输出情况。

数据变更

创建数据库

mysql> create database canal_test;
Query OK, 1 row affected (0.01 sec)
mysql> use canal_test;
Database changed
mysql> show tables;
Empty set (0.00 sec)

我们通过语句create database canal_test; 创建了数据库过后,可以看到控制有如下输出,已经监听到了 bin log 的变化了。

12.jpg

创建测试表

再执行如下语句创建数据表

CREATE TABLE `example` 
(
    `id` INT(11) NOT NULL
    ,`username` VARCHAR(32) DEFAULT NULL COMMENT '用户名称'
    ,` age` INT(11) DEFAULT 0 COMMENT '用户年龄'
    ,` sex` INT(11) DEFAULT 0 COMMENT '用户性别 0 男 1 女'
    ,PRIMARY KEY (`id`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

13.jpg

可以看到成功的监听到了数据表的创建,接下来我们再试试插入数据和更新数据

## 插入语句
INSERT INTO example VALUES(1,'张三', 18,0),(2,'李四', 19,0),(3,'王五', 20,1);
## 更新语句
update example set username = '张小三' where id = 1;

14.jpg

从上图中我们可以看到插入的数据以及更新的数据都被实时的监听到了。监听到数据过后,我们就可以根据事件类型以及相应的库和表名来进行过滤操作了。对了,我们可以通过配置 filter 来过滤需要监听的数据库和数据表或者字段,这个都是可以实现的,避免无用的数据变更带来的影响。

对于访问 GitHub 很慢的小伙伴,阿粉已经帮大家把 Canal 的压缩包下载好了,公众号回复【canal】即可获取网盘地址。

总结

今天的文章给大家分享了 Canal 的使用,感兴趣的小伙伴可以自己去试试,如果需要的话,可以在项目中用起来,会事半功倍。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3天前
|
SQL 运维 关系型数据库
|
29天前
|
SQL 关系型数据库 MySQL
使用mysql数据库的binlog应对故障
【6月更文挑战第1天】本文介绍`mysql的 binlog`工具用于解析MySQL的二进制日志,转换为可执行的SQL语句,主要用于数据库主从复制和增量恢复。定期备份和binlog推送能实现故障时的数据恢复。
55 9
使用mysql数据库的binlog应对故障
|
15天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之假如mysql的binlog有很多个文件,按什么顺序扫描
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4天前
|
存储 关系型数据库 分布式数据库
PolarDB产品使用问题之可以通过什么操作来监听从库的binlog
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
13天前
|
关系型数据库 MySQL
蓝易云 - MySQL自动删除binlog日志
注意,这个参数只影响新的binlog文件。如果你的服务器上已经有超过7天的日志文件,你需要手动删除它们,或者使用PURGE BINARY LOGS命令来删除它们。
12 0
|
2月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
33 2
|
2月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何配置可以实现实时同步多张MySQL源表时只读取一次binlog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
存储 关系型数据库 MySQL
|
2天前
|
存储 关系型数据库 MySQL
|
2天前
|
存储 SQL 关系型数据库