Canal 学习笔记

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Canal 学习笔记

Canal 学习笔记


前言

canal github 链接:https://github.com/alibaba/canal

canal 官方文档地址:https://github.com/alibaba/canal/wiki

canal 下载地址:https://github.com/alibaba/canal/releases

数据监控 github 地址:https://github.com/chenqian56131/spring-boot-starter-canal


MySQL主备复制原理

image.png


  • 中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)


MySQL 准备

备注:docker 运行的 mysql 不需要配置 binlog


开启 Binlog

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复,作为唯一标识


授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;


Canal-Deployer

docker 地址:https://hub.docker.com/r/canal/canal-server


Docker 安装

# 下载
docker pull canal/canal-server
# 运行
docker run -di --name canal-server -p 11111:11111 canal/canal-server


Tar 包安装

下载与安装


下载地址:https://github.com/alibaba/canal/releases

下载并解压 canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/app/canal/deployer


目录结构如下

drwxr-xr-x. 2 root root   76 Oct 10 22:33 bin
drwxr-xr-x. 5 root root  123 Oct 10 22:33 conf
drwxr-xr-x. 2 root root 4096 Oct 10 22:33 lib
drwxrwxrwx. 2 root root    6 Sep  2 03:26 logs


配置

canal.properties 配置

# canal.id 唯一标识,不能重复
canal.id = 192


同步配置,conf/example/instance.properties 文件进行配置

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息,主数据库的ip和port
canal.instance.master.address = 192.168.8.4:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息,主数据库账户密码
canal.instance.dbUsername = root  
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName = 
canal.instance.connectionCharset = UTF-8
#table regex,.\* 表示所有数据库,.\\\\..\* 表示所有表, .\*\\\\..\* 表示所有库表
canal.instance.filter.regex = .\*\\\\..\*
# 过滤指定库表
# canal.instance.filter.regex = changgou_content.\*,changgou_goods.\*
# mq config——外部(自定义的微服务)监听,需要指定 topic 名称
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*


  • canal.instance.connectionCharset 代表数据库的编码方式对应到 Java 中的编码类型
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false


命令

# 启动命令
sh bin/startup.sh
# 关闭命令
sh bin/stop.sh


日志


  • 查看 server 日志 :logs/canal/canal.log
  • 查看 instance 的日志 :logs/example/example.log


数据监控

数据监控 github 地址:https://github.com/chenqian56131/spring-boot-starter-canal


将 starter-canal 安装到本地 maven 仓库中(温馨提示:可以通过 idea 导入项目后,执行 clean compile package install 即可安装到 maven 仓库中)

入门项目


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>changgou-service</artifactId>
        <groupId>com.changgou</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>changgou-service-canal</artifactId>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

application.yml

server:
  port: 18083
spring:
  application:
    name: canal
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
# canal 配置
canal:
  client:
    instances:
      # example 是 canal 配置的 topic 名
      example:
        host: 192.168.8.5
        port: 11111


启动类

package com.changgou;
import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
/**
 * @EnableCanalClient 开启 canal 客户端
 * 
 * @Author Theodore
 * @Date 2019/10/11 15:17
 */
@EnableCanalClient
@EnableEurekaClient
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class CanalApplication {
    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }
}


监听类

package com.changgou.canal;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
import java.util.List;
/**
 * @CanalEventListener 监听 canal
 *
 * @Author Theodore
 * @Date 2019/10/11 15:25
 */
@CanalEventListener
public class CanalDataEventListener {
    /**
     * @InsertListenPoint : 增加数据监听,只有增加后的数据:
     * rowData.getAfterColumnsList() : 适用于增加、修改操作
     * rowData.getBeforeColumnsList() : 适用于删除、修改操作
     * @param eventType :当前操作的类型,插入数据操作
     * @param rowData :发生变更的一行数据记录
     */
    @InsertListenPoint
    public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
            System.out.println("列名:" + column.getName() + "\t插入后对应的值:" + column.getValue());
        }
        // 获取数据后可以对数据做下一步处理
    }
    /**
     * @UpdateListenPoint 修改数据监听,
     * @param eventType 当前操作的类型,插入数据操作
     * @param rowData 发生变更的一行数据记录
     */
    @UpdateListenPoint
    public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
        for (CanalEntry.Column column : beforeColumnsList) {
            System.out.println("列名:" + column.getName() + "\t修改前对应的值:" + column.getValue());
        }
        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
        for (CanalEntry.Column column : afterColumnsList) {
            System.out.println("列名:" + column.getName() + "\t修改后对应的值:" + column.getValue());
        }
    }
    /**
     * @DeleteListenPoint 删除数据监听
     * @param eventType
     * @param rowData
     */
    @DeleteListenPoint
    public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        String name = eventType.getValueDescriptor().getName();
        System.out.println("name: " + name);
        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
        for (CanalEntry.Column column : beforeColumnsList) {
            System.out.println("列名:" + column.getName() + "\t删除前对应的值:" + column.getValue());
        }
    }
    /**
     * 自定义监听
     * 
     * eventType = {CanalEntry.EventType.DELETE, CanalEntry.EventType.UPDATE} // 监听操作的类型
     * schema = {"changgou_content"} // 指定监听的schema,mysql 没有schema之分,相当于 mysql数据库
     * table = {"tb_content"} // 指定监听哪些表
     * destination = "example" // 监听的 topic
     * @param eventType
     * @param rowData
     */
    @ListenPoint(
            eventType = {CanalEntry.EventType.DELETE, CanalEntry.EventType.UPDATE},
            schema = {"changgou_content"},
            table = {"tb_content"},
            destination = "example"
    )
    public void onEventDiy(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
        for (CanalEntry.Column column : beforeColumnsList) {
            System.out.println("列名:" + column.getName() + "\t修改前对应的值:" + column.getValue());
        }
        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
        for (CanalEntry.Column column : afterColumnsList) {
            System.out.println("列名:" + column.getName() + "\t修改后对应的值:" + column.getValue());
        }
    }
}


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
canal SQL 关系型数据库
|
canal SQL 关系型数据库
Canal报错总结(三)
Canal报错总结(三)
|
canal 关系型数据库 MySQL
Canal服务搭建
Canal服务搭建
1156 1
Canal服务搭建
|
1月前
|
canal 监控 关系型数据库
canal的特点是什么?如何使用?
【10月更文挑战第23天】canal的特点是什么?如何使用?
117 3
|
5月前
|
canal 监控 关系型数据库
Canal使用和安装总结
Canal使用和安装总结
246 2
|
7月前
|
canal SQL 关系型数据库
Canal入门
Canal入门
211 1
|
canal druid 关系型数据库
Canal报错总结(二)
Canal报错总结(二)
|
canal SQL 缓存
Canal1.1.6安装部署
Canal1.1.6安装部署
370 0
|
canal 消息中间件 关系型数据库
canal同步binlog实战
canal同步binlog实战
|
canal 关系型数据库 MySQL
Canal
Canal是一个用于MySQL数据增量订阅和消费的开源组件,支持多种数据订阅方式,包括基于GTID位点的订阅。
1128 0