1. Canal概述
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
目前内部版本已经支持mysql和oracle部分版本的日志解析
当前的canal开源版本支持5.7及以下的版本(阿里内部mysql
5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基于日志增量订阅&消费支持的业务:
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
1.1 工作原理
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
1.2 架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
1.3 HA机制设计
canal的HA分为两部分,canal server
和canal client
分别有对应的HA实现
- canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
- canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
Canal Server:
大致步骤:
- canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
- 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
- canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.
Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制。
1.4 相关资料
2. 安装部署
2.1 创建数据库用户canal
目标数据库先要创建好canal用的用户
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
2.2 远程拉取
- 访问docker hub获取最新的版本 访问:https://hub.docker.com/r/canal/canal-server/tags/
- 下载对应的版本,比如最新版为1.1.3
docker pull canal/canal-server:v1.1.3
2.3 启动Docker
docker目录下自带了一个run.sh脚本
下载脚本:
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
创建启动脚本:
vim start.sh
docker stop canal-server docker rm canal-server sh run.sh -e canal.auto.scan=false \ -e canal.destinations=zlt-test \ -e canal.instance.master.address=192.168.28.130:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \
destinations:目标数据库名
instance.master.address:目标数据库地址
instance.dbUsername:目标数据库用户名
instance.dbPassword:目标数据库密码
具体其他配置可参考AdminGuide
docker模式下,单docker实例只能运行一个instance,主要为配置问题。如果需要运行多instance时,可以自行制作一份docker镜像即可
2.4 运行
sh start.sh • 1
运行效果:看到successful之后,就代表canal-server启动成功,可以启动canal-client连接上来进行binlog订阅了
2.5 MQ消息投递
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:RocketMQ和Kafka
将上面的启动脚本改为以下,增加MQ相关参数:
docker stop canal-server docker rm canal-server sh run.sh -e canal.auto.scan=false \ -e canal.destinations=zlt-test \ -e canal.instance.master.address=192.168.28.130:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.mq.topic=canal-test \ -e canal.serverMode=RocketMQ \ -e canal.mq.servers=192.168.28.130:9876 \
canal.mq.topic:配置mq的topic
canal.serverMode:tcp(默认), kafka, RocketMQ
canal.mq.servers:mq地址
投递MQ后的消息如下图:
2.6 如果要订阅的是mysql的从库该怎么做?
生产环境中的主库是不能随便重启的,所以订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操作写进中继日志,并写到自己的二进制日志的,所以需要让其成为canal的主库,必须让其将日志也写到自己的二进制日志里面。处理方法:修改my.cnf,增加一行log_slave_updates=1,重启数据库后就可以了。
3. 实时同步数据到ElasticSearch
如今大型的IT系统中,都会使用分布式的方式,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?下面介绍如何实现数据修改后,需要及时的将mysql中的数据更新到elasticsearch中。
3.1 数据同步方案选择
- 代码实现(双写):针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。
- mybatis实现:通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。
- Aop实现:不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。
- logstash:logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每分钟进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。
实现方式 | 优缺点 |
代码实现 | 技术难度低,侵入性强,实时性高 |
基于mybatis | 有一定的技术难度,但是无法覆盖所有的场景 |
Aop实现 | 技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景 |
logstash | 技术难度低,无侵入性,无需开发,但会造成资源浪费,实时性也不高 |
那么是否有什么更好的方式进行处理吗?
- mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费。Canal安装部署通过数据库binlog实时抓取数据更新信息推送到消息队列MQ里,然后就可以通过消费MQ消息把数据实时同步到不同的异构数据源里了
3.2 增量同步ES
1.创建索引:在同步之前需要先创建号索引,下面是创建sys_user
索引的例子
curl -X PUT "http://192.168.28.130:9200/sys_user/" -H 'Content-Type: application/json' -d' { "settings" : { "number_of_shards" : 1, "number_of_replicas" : 0 }, "mappings":{ "properties":{ "id": { "type": "long" }, "username": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } }, "analyzer": "ik_max_word" }, "nickname": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } }, "analyzer": "ik_max_word" }, "mobile": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } }, "analyzer": "ik_max_word" }, "sex": { "type": "keyword" }, "type": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } }, "analyzer": "ik_max_word" }, "createTime": { "type": "date" }, "updateTime": { "type": "date" }, "company": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } }, "analyzer": "ik_max_word" }, "openId": { "type": "text", "fields":{ "keyword":{ "type":"keyword", "ignore_above":256 } } }, "isDel": { "type": "keyword" } } } }
地址192.168.28.130:9200为es的ip地址
sys_user为索引名
2.安装配置Adapter,下载adapter:https://github.com/alibaba/canal/releases
详细的配置说明请参考官方wiki:https://github.com/alibaba/canal/wiki/Sync-ES
3.同步表sys_user的配置样例:
canal.adapter-xxx\conf\application.yml
:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: rocketMQ # kafka rocketMQ mqServers: 192.168.28.130:9876 #or rocketmq flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.28.130:3306/user-center?useUnicode=true username: canal password: canal canalAdapters: - instance: canal-sys-user # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es hosts: 192.168.28.130:9300 properties: cluster.name: my-es
mode:消费的类型有3种选择tcp、kafka和rocketMQ
mqServers: mq的地址
defaultDS:配置源数据库的地址
instance:配置mq的topic名称
es:配置es的地址和集群名
canal.adapter-xxx\conf\es\sys_user.yml:
dataSourceKey: defaultDS destination: canal-sys-user groupId: g1 esMapping: _index: sys_user _type: search_data _id: id upsert: true sql: "select id, username, nickname, mobile , case when sex = 0 then '男' when sex = 1 then '女' end sex , case when type = 'app' then '移动用户' when type = 'BACKEND' then '后台用户' end type , create_time createTime, update_time updateTime, company, open_id openId , case when is_del = 0 then '否' when is_del = 1 then '是' end isDel from sys_user" etlCondition: "where update_time >= '{0}'" commitBatch: 3000
dataSourceKey:配置application.yml中源数据库的key
destination:配置mq的topic名称
_index:插入es中的索引名
_type:插入es中mappings的type属性
_id:配置id字段
upsert:配置插入数据正常时写入,主键冲突时更新
sql:配置具体要同步es的数据
etlCondition:条件判断,通过更新日期实现增量同步
3.3 历史数据全量同步ES
如果在搭建增量同步之前mysql
数据库已经存在历史数据,就需要做初始化同步,全量同步可以使用Canal-Adapter
的rest-api
来实现
全量同步初始化,例子如下:
curl -X POST http://192.168.28.130:8081/etl/es/sys_user.yml
ip为Canal-Adapter所在服务器ip
路径/es/sys_user.yml为conf目录下配置文件的路径,会自动忽略where条件进行全量同步