canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。
Canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
- canal 解析 binary log 对象(原始为 byte 流)。
MySQL 配置
开启 binlog
Canal 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。
编辑 /etc/my.cnf 的 mysqld 下添加如下配置:
server-id = 7777 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10
然后,重启一下 Mysql 以使得 binlog 生效。
systemctl restart mysqld.service
检查 binlog 是否开启:
[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin | log_bin | ON
创建用户
创建用户 canal,密码 canal,并授予 MySQL slave 的权限:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
下载 Canal 压缩包
下载地址: https://github.com/alibaba/canal/releases
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息 # position info canal.instance.master.address=192.168.1.14:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 匹配所有表,则每个表都会发送到各自表名的topic # 例如数据库名为school,表名为student,则topic名字为school_student canal.mq.dynamicTopic=.*\\..*
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = kafka #Kafka集群地址 kafka.bootstrap.servers = 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092
启动 canal server:
bin/startup.sh
Kafka 消费数据
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(8,'tonny',20); Query OK, 1 row affected (0.01 sec)
Kafka 消费者可以成功消费到新插入的数据:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic student_school --from-beginning {"data":[{"id":"8","name":"tonny","age":"20"}],"database":"school","es":1617512377000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"student","ts":1617512405579,"type":"INSERT"}
场景二:同步 MySQL 数据到 Elasticsearch
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息 # position info canal.instance.master.address=192.168.1.14:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = tcp
启动 canal server:
bin/startup.sh
Canal-adapter 配置
修改启动器配置
修改conf/application.yml文件:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # Canal Server的访问地址(在前面的conf/canal.properties中默认配置了),保持默认 canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # 连接的数据库信息 srcDataSources: defaultDS: # jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true url: jdbc:mysql://192.168.1.14:3306/school?useUnicode=true username: canal #用户名 password: canal #密码 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: es7 #去读取的conf/es7目录下的yml文件 hosts: 192.168.1.171:9300 #Elasticsearch地址 properties: mode: transport cluster.name: cr7-elastic #Elasticsearch集群名字
编辑适配器表映射文件
编辑 conf/es7/mytest_user.yml文件,注意指定对应库表 id 为 Elasticsearch中 的 _id,否则会空指针异常:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example #cannal的instance或者MQ的topic groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: mytest_user #在Elasticsearch实例中所创建的索引的名称 _id: _id #需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id sql: "select t.id as _id,t.id,t.name,t.age from student t" #SQL语句,用来查询需要同步到Elasticsearch中的字段 commitBatch: 3000 #提交批大小
启动 canal-adapter 启动器:
bin/startup.sh
启动成功后查看 canal adapter 日志会有如下内容:
[root@canal canal-adapter]# tail -f /usr/local/canal-adapter/logs/adapter/adapter.log #输出结果: ...... 2021-04-04 13:32:58.625 [http-nio-8081-exec-10] INFO com.alibaba.otter.canal.adapter.launcher.rest.CommonRest - #Destination: example sync on 2021-04-04 13:32:58.631 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <============= 2021-04-04 13:32:58.677 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <============= ......
Elasicsearch 获取数据
先创建索引对应的 mapping,否则 canal 会无法识别索引,会报写入错误:
PUT mytest_user { "mappings": { "properties": { "name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "age": { "type": "integer" } } } }
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(2,'chengzw',18); Query OK, 1 row affected (0.01 sec)
查看 canal-adapter 日志:
2021-04-04 13:36:12.022 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"name":"chengzw","age":18}],"database":"school","destination":"example","es":1617514543000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"student","ts":1617514571559,"type":"INSERT"} Affected indexes: mytest_user
查看 Elasticsearch 可以搜索到新的数据:
GET mytest_user/_search #返回结果: { "took" : 8, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "mytest_user", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "chengzw", "age" : 18 } } ] } }
如果删除 MySQL 数据库数据,Canal 也会将对应 Elasticsearch 上的文档删除。
Canal-adapter 管理 REST 接口
# 查询所有订阅同步的canal instance [root@canal canal]# curl http://127.0.0.1:8081/destinations [{"destination":"example","status":"on"}] # 数据同步关闭 [root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT {"code":20000,"message":"实例: example 关闭同步成功"} # 数据同步开启 [root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT {"code":20000,"message":"实例: example 开启同步成功"}
场景三:通过 Java API 获取 Canal 数据
Canal Server 配置
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息 # position info canal.instance.master.address=192.168.1.14:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal
引入 maven 依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
Java 代码
package com.chengzw; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; /** * Canal 客户端 * @author 程治玮 * @since 2021/4/4 12:31 下午 */ public class SimpleCanalClientExample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.175", 11111), "example", "", ""); //一次性获取数据的数量 int batchSize = 1000; int emptyCount = 0; try { //打开连接 connector.connect(); //订阅全部数据库和全部表 connector.subscribe(".*\\..*"); //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 connector.rollback(); //指定连续获取空数据最大的次数,达到这个次数断开连接 int totalEmptyCount = 2000; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); //获取数据信息 } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); //打印格式:binlog[mysql-bin.000001:5430] , name[school,student] , eventType : INSERT /// binlog文件名,偏移量,数据库名,表名,操作 System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); //打印数据 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { //删除数据 printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { //插入数据 printColumn(rowData.getAfterColumnsList()); } else { //更新数据 System.out.println("------- before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("------- after"); printColumn(rowData.getAfterColumnsList()); } } } } // 打印数据每个字段的更新情况 // id : 8 update=false // name : tonny update=false // age : 20 update=false private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(14,'mali',20); Query OK, 1 row affected (0.00 sec)
查看控制台输出结果:
empty count : 1 empty count : 2 empty count : 3 ================ binlog[mysql-bin.000001:12924] , name[school,student] , eventType : INSERT id : 14 update=true name : mali update=true age : 20 update=true
参考链接
- https://github.com/alibaba/canal/wiki/ClientAdapter
- https://github.com/alibaba/canal/wiki/Sync-ES
- https://github.com/alibaba/canal/wiki/ClientExample
- https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ
- https://help.aliyun.com/document_detail/135297.html#title-3b7-i1b-4n3
- https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart