实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
简介: 实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

大家好,我是不才陈某~

数据同步一直是一个令人头疼的问题。在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方。

但是随着业务量增大,数据量变多以及各种复杂场景下的分库分表的实现,使数据同步变得越来越困难。

今天这篇文章使用阿里开源的中间件Canal解决数据增量同步的痛点。

文章目录如下:

Canal是什么?

canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

从这句话理解到了什么?

基于MySQL,并且通过MySQL日志进行的增量解析,这也就意味着对原有的业务代码完全是无侵入性的。

工作原理:解析MySQL的binlog日志,提供增量数据。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

官方文档:https://github.com/alibaba/canal

Canal数据如何传输?

先来一张官方图:

Canal分为服务端和客户端,这也是阿里常用的套路,比如前面讲到的注册中心Nacos

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

目前为止支持的消息中间件很全面了,比如KafkaRocketMQRabbitMQ

数据同步还有其他中间件吗?

有,当然有,还有一些开源的中间件也是相当不错的,比如Bifrost

常见的几款中间件的区别如下:

当然要我选择的话,首选阿里的中间件Canal。

Canal服务端安装

服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases

目前最新的是v1.1.5,点击下载:

下载完成解压,目录如下:

本文使用Canal+RabbitMQ进行数据的同步,因此下面步骤完全按照这个base进行。

1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。

一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 监听的数据库
canal.instance.defaultDatabaseName=test
# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*

3、设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。

这里需要修改两处配置文件,如下;

1、canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码...

# 传输方式:tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
#########         RabbitMQ       #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
# 用户名、密码
rabbitmq.username =guest
rabbitmq.password =guest
## 是否持久化
rabbitmq.deliveryMode = 2

2、canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

canal.mq.topic=canal.routing.key

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange(必须和配置中的相同)的exchange和一个名称为 canal.queue(名称随意)的队列。

其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:

5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat,启动成功如下:

6、测试

在本地数据库test中的oauth_client_details插入一条数据,如下:

INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

此时查看MQ中的canal.queue已经有了数据,如下:

其实就是一串JSON数据,这个JSON如下:

{
 "data": [{
  "client_id": "myjszl",
  "resource_ids": "res1",
  "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
  "scope": "all",
  "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
  "web_server_redirect_uri": "http://www.baidu.com",
  "authorities": null,
  "access_token_validity": "1000",
  "refresh_token_validity": "1000",
  "additional_information": null,
  "autoapprove": "false"
 }],
 "database": "test",
 "es": 1640337532000,
 "id": 7,
 "isDdl": false,
 "mysqlType": {
  "client_id": "varchar(48)",
  "resource_ids": "varchar(256)",
  "client_secret": "varchar(256)",
  "scope": "varchar(256)",
  "authorized_grant_types": "varchar(256)",
  "web_server_redirect_uri": "varchar(256)",
  "authorities": "varchar(256)",
  "access_token_validity": "int(11)",
  "refresh_token_validity": "int(11)",
  "additional_information": "varchar(4096)",
  "autoapprove": "varchar(256)"
 },
 "old": null,
 "pkNames": ["client_id"],
 "sql": "",
 "sqlType": {
  "client_id": 12,
  "resource_ids": 12,
  "client_secret": 12,
  "scope": 12,
  "authorized_grant_types": 12,
  "web_server_redirect_uri": 12,
  "authorities": 12,
  "access_token_validity": 4,
  "refresh_token_validity": 4,
  "additional_information": 12,
  "autoapprove": 12
 },
 "table": "oauth_client_details",
 "ts": 1640337532520,
 "type": "INSERT"
}

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值.....

客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

Canal客户端搭建

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue这个队列。

1、创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

/**
 * @author 公众号 码猿技术专栏
 * Canal消息接收实体类
 */
@NoArgsConstructor
@Data
public class CanalMessage<T> {
    @JsonProperty("type")
    private String type;
    @JsonProperty("table")
    private String table;
    @JsonProperty("data")
    private List<T> data;
    @JsonProperty("database")
    private String database;
    @JsonProperty("es")
    private Long es;
    @JsonProperty("id")
    private Integer id;
    @JsonProperty("isDdl")
    private Boolean isDdl;
    @JsonProperty("old")
    private List<T> old;
    @JsonProperty("pkNames")
    private List<String> pkNames;
    @JsonProperty("sql")
    private String sql;
    @JsonProperty("ts")
    private Long ts;
}

2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。

代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * 监听MQ获取Canal增量的数据消息
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
    public void handleDataChange(String message) {
        //将message转换为CanalMessage
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
        //TODO 业务逻辑自己完善...............
    }
}

3、测试

下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

INSERT INTO `oauth_client_details`
VALUES
 ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

客户端转换后的消息如下图:

上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。

客户端案例源码已经上传GitHub,关注公众号:码猿技术专栏,回复关键词:9530 获取!

总结

数据增量同步的开源工具并不只有Canal一种,根据自己的业务需要选择合适的组件。

相关文章
|
14天前
|
前端开发 Java API
SpringBoot整合Flowable【06】- 查询历史数据
本文介绍了Flowable工作流引擎中历史数据的查询与管理。首先回顾了流程变量的应用场景及其局限性,引出表单在灵活定制流程中的重要性。接着详细讲解了如何通过Flowable的历史服务API查询用户的历史绩效数据,包括启动流程、执行任务和查询历史记录的具体步骤,并展示了如何将查询结果封装为更易理解的对象返回。最后总结了Flowable提供的丰富API及其灵活性,为后续学习驳回功能做了铺垫。
26 0
SpringBoot整合Flowable【06】- 查询历史数据
|
3月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
310 2
|
14天前
|
存储 前端开发 Java
SpringBoot整合Flowable【05】- 使用流程变量传递业务数据
本文介绍了如何使用Flowable的流程变量来管理绩效流程中的自定义数据。首先回顾了之前的简单绩效流程,指出现有流程缺乏分数输入和保存步骤。接着详细解释了流程变量的定义、分类(运行时变量和历史变量)及类型。通过具体代码示例展示了如何在绩效流程中插入全局和局部流程变量,实现各节点打分并维护分数的功能。最后总结了流程变量的使用场景及其在实际业务中的灵活性,并承诺将持续更新Flowable系列文章,帮助读者更好地理解和应用Flowable。 简要来说,本文通过实例讲解了如何利用Flowable的流程变量功能优化绩效评估流程,确保每个环节都能记录和更新分数,同时提供了全局和局部变量的对比和使用方法。
45 0
|
1月前
|
存储 NoSQL 架构师
阿里面试:聊聊 CAP 定理?哪些中间件是AP?为什么?
本文深入探讨了分布式系统中的“不可能三角”——CAP定理,即一致性(C)、可用性(A)和分区容错性(P)三者无法兼得。通过实例分析了不同场景下如何权衡CAP,并介绍了几种典型分布式中间件的CAP策略,强调了理解CAP定理对于架构设计的重要性。
94 4
|
2月前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
90 9
|
3月前
|
SQL JSON Java
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和PageHelper进行分页操作,并且集成Swagger2来生成API文档,同时定义了统一的数据返回格式和请求模块。
122 1
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
|
2月前
|
存储 easyexcel Java
SpringBoot+EasyExcel轻松实现300万数据快速导出!
本文介绍了在项目开发中使用Apache POI进行数据导入导出的常见问题及解决方案。首先比较了HSSFWorkbook、XSSFWorkbook和SXSSFWorkbook三种传统POI版本的优缺点,然后根据数据量大小推荐了合适的使用场景。接着重点介绍了如何使用EasyExcel处理超百万数据的导入导出,包括分批查询、分批写入Excel、分批插入数据库等技术细节。通过测试,300万数据的导出用时约2分15秒,导入用时约91秒,展示了高效的数据处理能力。最后总结了公司现有做法的不足,并提出了改进方向。
|
3月前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
531 6
|
3月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
296 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
3月前
|
前端开发 Java 数据库
springBoot:template engine&自定义一个mvc&后端给前端传数据&增删改查 (三)
本文介绍了如何自定义一个 MVC 框架,包括后端向前端传递数据、前后端代理配置、实现增删改查功能以及分页查询。详细展示了代码示例,从配置文件到控制器、服务层和数据访问层的实现,帮助开发者快速理解和应用。