Canal实现mysql数据与redis同步
1.Canal简介
canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
2.项目背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅和消费支持的业务包括:
1.数据库镜像
2.数据库实时备份
3.多级索引 (卖家和买家各自分库索引)
4.search build
5.业务cache刷新
6.带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
3.工作原理
3.1.MySQL主从复制原理
1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
3.2.Canal工作原理
1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)
实现步骤
MySQL配置
我使用的是mysql5.6.x,修改mysql的配置文件my.ini,加入下面内容
# 日志存放路径 general_log=ON general_log_file=G:\mysql-cluster\master\master.log #default-character-set=utf8 character-set-server=utf8 default-storage-engine=InnoDB #innodb_force_recovery = 1 # server_id = ..... log_bin=mysql-bin binlog-format=ROW server-id=1 # 需要同步的数据库名称 binlog-do-db=test_canal # 忽略的数据库,建议填写 binlog-ignore-db=mysql |
配置完后重新启动数据库服务
执行下列授权语句
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT,REPLICATION SLAVE,REPLICATION client ON *.* to 'canal'@'localhost' FLUSH PRIVILEGES; |
Canal部署与配置
下载canal:https://github.com/alibaba/canal/releases/
这里我下载的是1.1.5
下载完成解压即可,修改配置
canal/conf/canal.properties可保持不变,默认的端口5个1
canal/conf/example/instance.properties需要配置
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0
# enable gtid use true/false # 这里的id不要跟mysql的server-id相同 canal.instance.mysql.slaveId=123 canal.instance.gtidon=false
# position info canal.instance.master.address=127.0.0.1:3307 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=
# rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=
# table meta tsdb info canal.instance.tsdb.enable=false #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid=
# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
|
配置完成后启动canal,我这里是windows系统,直接双击bin/startup.bat即可
查看是否正常启动,需要看2个日志文件
logs/canal/canal.log文件中有如下内容:the canal server is running now ......
logs/example/example.log文件中有如下内容:start successful....
证明启动成功
Java的canal客户端
创建maven工程
导入pom依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <dependency> |
Redis工具类
package cn.demo.util; |
Canal客户端测试类
package cn.demo.test; |
测试效果,触发数据库变更
执行sql语句查看redis相应的变化
经过测试,mysql的数据已经同步到redis,测试成功