采用canal搭建MySQL到ES数据传输通道

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
简介: 日常项目实践过程中,经常需要将数据从MySQL关系型数据库同步到Elasticsearch中进行多维度查询,满足查询性能要求或多查询条件检索要求。本文总结数据传输技术的一个解决方案供大家参考使用。

1、功能及使用场景

1.1、功能介绍

     canal是阿里巴巴开源的mysql数据传输组件,基于mysql binlog,提供了准确、实时的数据传输服务。有关binlog介绍,参见binlog介绍

以下来自官方GitHub介绍。GitHub地址

image.png

     canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

    早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2、使用场景

根据官方文档,目标系统支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。

本文主要介绍MySQL同步到Elasticsearch的使用。


2、需求引入

整个过程用一个示例来介绍canal的安装使用。

存在如下两张表(一对多关系),用户信息表和用户权限表,需要将用户信息以及权限同步到Elasticsearch

user_info(用户信息表):                                                                          

id

name

role_id

1 张三 1
2 李四 1
3 王大锤 2

role(权限表):

role_id

role_name

1 管理员
2 测试员

                           

查询sql:

SELECT   a.idAS _id,  a.name,  a.role_id,  b.role_nameFROMuser_info a
LEFT JOIN role b
ONb.role_id= a.role_id

查询结果:

_id

name

role_id

role_name

1 张三 1 管理员
2 李四 1 管理员
3 王大锤 2 测试员

对应的es索引结构:

{
"user_index": {
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text"                    },
"role_id": {
"type": "long"                    },
"role_name": {
"type": "text"                    }
                }
            }
        }
    }
}


3、canal文件下载及准备

     canal分为canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz),分别安装部署。

3.1 下载文件

官方GitHub下载地址:https://github.com/alibaba/canal/releases

下载canal服务端(canal.deployer-1.1.4.tar.gz)和客户端(canal.adapter-1.1.4.tar.gz),如下图。

image.png

下载完成如下

image.png

3.2准备文件

执行如下命令:

image.png

命令运行完成后,进入adapter和deployer可以看到如下结构(忽略我本机的.DS_Store)

adapter:

image.png

deployer:

image.png

3.3 deployer 配置修改

3.3.1 准备:

(针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步)

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,下面新建了canal账号
CREATE USER canal IDENTIFIED BY'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON*.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;

3.3.2 修改连接数据库信息

vi conf/example/instance.properties

修改如下标红信息

image.png

4、deployer安装及效果测试

参考官方文档地址:https://github.com/alibaba/canal/wiki/QuickStart

4.1 启动 deployer

在deployer目录运行启动脚本

sh bin/startup.sh

image.png

查看 server 日志

tail -f logs/canal/canal.log

image.png

查看 instance 的日志

tail -f logs/example/example.log

image.png

看到如上日志,标志启动成功。

4.2 测试deployer效果

(4.2 步骤可跳过,主要为验证deployer端效果)

4.2.1 在本地电脑新建普通maven工程

image.png

pom文件,依赖如下pom

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>

image.png

4.2.2 新建ClientSimple类

粘贴测试代码(下面链接页面上的ClientSimple代码)

https://github.com/alibaba/canal/wiki/ClientExample


将圈红ip改为部署adapter的ip,然后直接启动此main方法

image.png

启动完成看到如下日志:

image.png

日志会循环打印count

此时触发数据库变更

user_info表变更前

image.png

user_info表变更后,新加了一条名叫逻辑的记录

image.png

可在日志处观察到变更信息,标志着deployer监听mysql binlog变更成功

image.png

5、adapter安装及效果测试

canal adapter 的 Elastic Search 版本支持6.x.x以上

官方文档地址

https://github.com/alibaba/canal/wiki/Sync-ES

5.1修改配置

5.1.1 修改启动器配置: application.yml

进adapter/conf目录

vi  application.yml

修改如下配置,注意缩进格式,yml文件严格缩进格式,格式错误会引起启动失败问题。

image.png

mode使用rest模式,测试使用transport会出问题,目前没找到原因,下图mode还没更改

image.png

5.1.2 修改 conf/es/mytest_user.yml文件

adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件

不需要的文件可删除,只配置需要的yml文件

修改如下配置,esMapping信息,包括 index,type,sql,其中sql尽量保证不要换行,在文本编辑器中编辑成一行,在粘贴进去,否则可能会出问题(还是由于yml格式问题)

此处的sql的字段别名即是es字段名,不写别名,默认原名即是es字段名,有关详细说明,可参见官方文档https://github.com/alibaba/canal/wiki/Sync-ES

etlCondition 可以注释掉,我们默认任何条件都同步

image.png

5.2 启动adapter

启动命令 sh bin/startup.sh

查看日志:

tail -f logs/adapter/adapter.log

观察如下日志,即表示启动成功

image.png

5.3 效果测试

canal是一个MySQL增量订阅组件,所以不支持数据的初始化

我们需要在数据库触发变更,才能将数据同步到es

变更前:

es数据->在kibana查询对应数据,可以看到右侧数据为空

image.png

mysql数据→使用查询sql,查询到数据如下

image.png

下面,我们进行数据变更:

比如将张三名字变成张六,MySQL数据如下:

image.png

此时,es数据在MySQL数据变更同时,es数据相应变更,如下,可以观察到,数据变更已经成功从MySQL同步到elasticsearch

需要注意的是,由于我们只变更了user_info表,所以此处只同步了user_info表的name和id字段,role_name字段,并没有同步,只有role_name字段变更时,才会被同步,所以实际使用时,要先做好数据初始化工作。

image.png

同时,可在logs/adapter/adapter.log观察到数据变更日志

image.png

至此,全部操作完成。


6、全量数据导入

由于canal只支持增量导入,所以官方adapter提供了全量导入手动触发的功能,

canal全表同步(etl功能,手动触发)

参见源码:

/*** ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST** @param type 类型 hbase, es* @param task 任务名对应配置文件名 mytest_person2.yml* @param params etl where条件参数, 为空全部导入*/@PostMapping("/etl/{type}/{task}")
publicEtlResultetl(@PathVariableStringtype, @PathVariableStringtask,
@RequestParam(name="params", required=false) Stringparams) {
returnetl(type, null, task, params);
}

只需要发送http命令即可

例:如下,ip和端口是部署canal adapter的IP和端口,然后类型选es,后面在跟上对应的yml配置文件即可

curl http://127.0.0.1:8081/etl/es/mytest_person2.yml -X POST

导入成功提示

image.png


7、常见问题

问题一:

ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /192.168.1.50:3306 has an error, retrying. caused by

com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example

原因:meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;

解决方案:删除meta.dat删除,再重启canal,问题解决;

集群操作:进入canal对应的zookeeper集群下,删除节点/otter/canal/destinations/xxxxx/1001/cursor ;重启canal即可恢复;


问题二:

java.lang.OutOfMemoryError: Java heap space

 

canal消费端挂了太久,在zk对应conf下节点的

/otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉

监听数据库变更,只有TransactionBegin/TransactionEnd,没有拿到数据的EventType;

原因可能是canal.instance.filter.black.regex=.*\\..*导致,改canal.instance.filter.black.regex=再重启试试

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
目录
相关文章
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
637 4
|
3月前
|
存储 关系型数据库 MySQL
ES的全文索引和MySQL的全文索引有什么区别?如何选择?
【8月更文挑战第26天】ES的全文索引和MySQL的全文索引有什么区别?如何选择?
378 5
|
3月前
|
搜索推荐 关系型数据库 MySQL
不引入ES,如何利用MySQL实现模糊匹配?
【8月更文挑战第23天】在数据处理和查询优化的日常工作中,我们常常面临需要执行模糊匹配的场景,比如搜索用户姓名、商品标题等。虽然Elasticsearch(ES)等搜索引擎提供了高效且强大的文本搜索能力,但在某些轻量级或资源受限的环境中,直接利用MySQL数据库实现模糊匹配也是一个经济且可行的选择。下面,我将分享几种在MySQL中实现模糊匹配的技术方法。
139 0
|
3月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
752 0
|
5月前
|
canal 关系型数据库 MySQL
蓝易云 - 详解canal同步MySQL增量数据到ES
以上就是使用Canal同步MySQL增量数据到Elasticsearch的基本步骤。在实际操作中,可能还需要根据具体的业务需求和环境进行一些额外的配置和优化。
165 2
|
4月前
|
存储 关系型数据库 MySQL
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
【Elasticsearch】在es中实现mysql中的FIND_IN_SET查询条件
118 0
|
5月前
|
关系型数据库 MySQL 自然语言处理
不引入ES,如何利用MySQL实现模糊匹配
本文介绍了实现一个公司申请审批流程的业务场景,该流程涉及商务角色申请添加公司,然后由管理员审批。为了防止添加重复的公司,管理员在审批前需检查已有公司信息。核心思路是通过分词、匹配数据库中的数据并按匹配度排序。在技术选型上,由于系统规模小,选择了使用MySQL的正则匹配功能而非引入ES,以降低复杂性。实现过程中,首先对输入的公司名称进行预处理,移除无用信息如地名等,然后使用IKAnalyzer进行分词,最后通过正则表达式在数据库中进行模糊匹配并按匹配度排序。代码示例展示了如何处理公司名称、分词和执行模糊匹配的SQL查询。
|
6月前
|
canal 缓存 关系型数据库
MySQL如何实时同步数据到ES?试试阿里开源的Canal
MySQL如何实时同步数据到ES?试试阿里开源的Canal
194 3
|
9天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
24 1
|
11天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
27 4