Canal 原理与实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: Canal 原理与实践

image.pngcanal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。

image.png

Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

MySQL 配置

开启 binlog

Canal 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。

编辑 /etc/my.cnf 的 mysqld 下添加如下配置:

server-id         = 7777
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

然后,重启一下 Mysql 以使得 binlog 生效。

systemctl restart mysqld.service

检查 binlog 是否开启:

[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin
| log_bin                                                | ON

创建用户

创建用户 canal,密码 canal,并授予 MySQL slave 的权限:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载 Canal 压缩包

下载地址: https://github.com/alibaba/canal/releases

image.png

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配所有表,则每个表都会发送到各自表名的topic
# 例如数据库名为school,表名为student,则topic名字为school_student
canal.mq.dynamicTopic=.*\\..*

修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:

# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
#Kafka集群地址
kafka.bootstrap.servers = 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092

启动 canal server:

bin/startup.sh

Kafka 消费数据

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(8,'tonny',20);
Query OK, 1 row affected (0.01 sec)

Kafka 消费者可以成功消费到新插入的数据:

[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic student_school --from-beginning 
{"data":[{"id":"8","name":"tonny","age":"20"}],"database":"school","es":1617512377000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"student","ts":1617512405579,"type":"INSERT"}

场景二:同步 MySQL 数据到 Elasticsearch

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:

# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp

启动 canal server:

bin/startup.sh

Canal-adapter 配置

修改启动器配置

修改conf/application.yml文件:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # Canal Server的访问地址(在前面的conf/canal.properties中默认配置了),保持默认
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  # 连接的数据库信息
  srcDataSources:
    defaultDS:
    # jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true
      url: jdbc:mysql://192.168.1.14:3306/school?useUnicode=true
      username: canal  #用户名
      password: canal  #密码
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1 
      outerAdapters:
      - name: es7 #去读取的conf/es7目录下的yml文件
        hosts: 192.168.1.171:9300 #Elasticsearch地址
        properties:
          mode: transport 
          cluster.name: cr7-elastic  #Elasticsearch集群名字

编辑适配器表映射文件

编辑 conf/es7/mytest_user.yml文件,注意指定对应库表 id 为 Elasticsearch中 的 _id,否则会空指针异常:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example #cannal的instance或者MQ的topic 
groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user  #在Elasticsearch实例中所创建的索引的名称
  _id: _id #需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
  sql: "select t.id as _id,t.id,t.name,t.age from student t"  #SQL语句,用来查询需要同步到Elasticsearch中的字段
  commitBatch: 3000  #提交批大小

启动 canal-adapter 启动器:

bin/startup.sh

启动成功后查看 canal adapter 日志会有如下内容:

[root@canal canal-adapter]# tail -f /usr/local/canal-adapter/logs/adapter/adapter.log 
#输出结果:
......
2021-04-04 13:32:58.625 [http-nio-8081-exec-10] INFO  com.alibaba.otter.canal.adapter.launcher.rest.CommonRest - #Destination: example sync on
2021-04-04 13:32:58.631 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-04 13:32:58.677 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
......

Elasicsearch 获取数据

先创建索引对应的 mapping,否则 canal 会无法识别索引,会报写入错误:

PUT mytest_user
{
  "mappings": {
      "properties": {
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "age": {
          "type": "integer"
        }
      }
  }
}

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(2,'chengzw',18);
Query OK, 1 row affected (0.01 sec)

查看 canal-adapter 日志:

2021-04-04 13:36:12.022 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"name":"chengzw","age":18}],"database":"school","destination":"example","es":1617514543000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"student","ts":1617514571559,"type":"INSERT"} 
Affected indexes: mytest_user

查看 Elasticsearch 可以搜索到新的数据:

GET mytest_user/_search
#返回结果:
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mytest_user",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "chengzw",
          "age" : 18
        }
      }
    ]
  }
}

如果删除 MySQL 数据库数据,Canal 也会将对应 Elasticsearch 上的文档删除。

Canal-adapter 管理 REST 接口

# 查询所有订阅同步的canal instance
[root@canal canal]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}]
# 数据同步关闭
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
{"code":20000,"message":"实例: example 关闭同步成功"}
# 数据同步开启
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT
{"code":20000,"message":"实例: example 开启同步成功"}

场景三:通过 Java API 获取 Canal 数据

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

引入 maven 依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

Java 代码

package com.chengzw;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
 * Canal 客户端
 * @author 程治玮
 * @since 2021/4/4 12:31 下午
 */
public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.175",
                11111), "example", "", "");
        //一次性获取数据的数量
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            //打开连接
            connector.connect();
            //订阅全部数据库和全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            //指定连续获取空数据最大的次数,达到这个次数断开连接
            int totalEmptyCount = 2000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries()); //获取数据信息
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            EventType eventType = rowChage.getEventType();
            //打印格式:binlog[mysql-bin.000001:5430] , name[school,student] , eventType : INSERT
            ///                  binlog文件名,偏移量,数据库名,表名,操作
            System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //打印数据
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {  //删除数据
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {  //插入数据
                    printColumn(rowData.getAfterColumnsList());
                } else {  //更新数据
                    System.out.println("------- before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("------- after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
     // 打印数据每个字段的更新情况
     // id : 8    update=false
     // name : tonny    update=false
     // age : 20    update=false
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(14,'mali',20);
Query OK, 1 row affected (0.00 sec)

查看控制台输出结果:

empty count : 1
empty count : 2
empty count : 3
================ binlog[mysql-bin.000001:12924] , name[school,student] , eventType : INSERT
id : 14    update=true
name : mali    update=true
age : 20    update=true

参考链接


目录
相关文章
|
canal 关系型数据库 MySQL
Canal服务搭建
Canal服务搭建
1154 1
Canal服务搭建
|
1月前
|
canal 监控 关系型数据库
canal的特点是什么?如何使用?
【10月更文挑战第23天】canal的特点是什么?如何使用?
100 3
|
7月前
|
消息中间件 存储 设计模式
Kafka原理篇:图解kakfa架构原理
Kafka原理篇:图解kakfa架构原理
506 1
|
7月前
|
canal SQL 关系型数据库
Canal入门
Canal入门
203 1
|
7月前
|
消息中间件 存储 关系型数据库
探究Kafka原理-2.Kafka基本命令实操(下)
探究Kafka原理-2.Kafka基本命令实操
83 0
|
7月前
|
消息中间件 存储 运维
探究Kafka原理-2.Kafka基本命令实操(上)
探究Kafka原理-2.Kafka基本命令实操
93 0
深入浅出阿里数据同步神器:Canal原理+配置+实战全网最全解析!
canal 翻译为管道,主要用途是基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
|
消息中间件 NoSQL 中间件
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(上)
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建
341 0
|
消息中间件 存储 Kafka
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(下)
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(下)
182 0
|
canal Java 中间件
总结 canal 使用过程中的几个问题,值得思考一下
在给 canal 分配数据库权限的过程中,由于密码设置的比较简单,会报以下错误 ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
254 0