canal应用-1个server+2个instance+2个client+2个mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: canal应用-1个server+2个instance+2个client+2个mysql

一 canal应用架构设计



网络异常,图片无法展示
|



组件说明:


  1. linux内核版本(CentOS Linux 7):(命令:uname -a)
    Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  2. mysql版本:(SQL命令:select version(); 或 status)
    Server version: 5.6.43-log MySQL Community Server (GPL)
  3. canal版本:canal-1.1.3
  4. JDK版本: 1.8


canal工作原理:


  1. 模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal);
  3. 解析binary log对象(原始为byte流)


了解更多详细更新可以查看文章:【了解canal,看这个就够了】


二 架构落地实现流程



2.1 mysql配置与安装


1. 下载安装

在192.168.175.21和192.168.175.22上分别安装mysql,具体安装流程可参考文章:Linux-安装MySQL.


2. 创建canal账户

在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:

mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT ALL ON canal.* TO 'canal'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%';
mysql>FLUSH PRIVILEGES;


3. 验证登录

#远程登录
mysql -h 192.168.175.22 -P 3306 -u canal -pcanal
#本地登录
mysql -ucanal -pcanal


4. 修改my.cnf配置


分别在175.21和175.22两台服务器修改my.conf配置,查找my.cnf配置位置命令:whereis my.


192.168.175.21中的my.cnf配置新增如下内容:

log_bin=mysql-bin  #指定bin-log的名称,尽量可以标识业务含义
binlog_format=row  #选择row模式,必须!!!
server_id=1  #mysql服务器id


2.2 canal server配置与启动


1. 下载canal

下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz


2.上传并解压

进入192.168.175.20服务器,使用rz命令上传,使用如下命令进行解压至/usr/local/hadoop/app/canal:

tar xzvf canal.deployer-1.1.3.tar.gz -C canal


3. 修改配置


新解压的文件夹/usr/local/hadoop/app/canal/conf/有一个example文件夹,一个example就代表一个instance实例.而一个instance实例就是一个消息队列,所以这里可以将文件名改为example1,同时再复制出来一个叫example2.(命名可以使用监听的数据库名)


修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:

canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example1


修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:

canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example2


配置文件参数说明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide


4. 启动canal server


进入文件夹/usr/local/hadoop/app/canal/bin执行如下命令:

./startup.sh


查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:

2019-06-07 21:15:03.372 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111]
2019-06-07 21:15:22.245 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......


5. 启动canal client


注意运行canal客户端代码时,一定要先启动canal server!!!


(1) 添加pom依赖

<!--canal-->
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.3</version>
    </dependency>


(2) canal client代码:

package com.xgh.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
public class CanalClientTest {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.20", 11111),
                "example1", "", "");//或者example2
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");//订阅所有库下面的所有表
            //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {//实际生产中需要设置为true,死循环
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());
            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}


canal client运行实例:

empty count : 1
empty count : 2
empty count : 3
empty count : 4


6. 触发数据库变更


创建库:create database canal;

创建表:create table t_canal (id int,name varchar(20),status int);

插入数据:insert into t_canal values(10,'hello',1);

canal client输出日志:

================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT
id : 10    update=true
name : hello    update=true
status : 1    update=true


三. 自问自答-为何设置了数据表的过滤条件,但貌似没有生效?


:首先看文档AdminGuide,了解canal.instance.filter.regex的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:


  1. 所有表:.*   or  .\..
  2. canal schema下所有表: canal\..*
  3. canal下的以canal打头的表:canal\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)

检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)。


检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据

特别注意


参考文章:
  1. https://www.cnblogs.com/jayinnn/p/9606466.html
  2. https://github.com/alibaba/canal
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
13天前
|
存储 关系型数据库 MySQL
MySQL在企业内部应用场景有哪些
【10月更文挑战第17天】MySQL在企业内部应用场景有哪些
24 0
|
13天前
|
存储 关系型数据库 MySQL
介绍一下MySQL的一些应用场景
【10月更文挑战第17天】介绍一下MySQL的一些应用场景
54 0
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
534 4
|
19天前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
41 1
|
28天前
|
关系型数据库 MySQL 数据库
MySQL数据库:基础概念、应用与最佳实践
一、引言随着互联网技术的快速发展,数据库管理系统在现代信息系统中扮演着核心角色。在众多数据库管理系统中,MySQL以其开源、稳定、可靠以及跨平台的特性受到了广泛的关注和应用。本文将详细介绍MySQL数据库的基本概念、特性、应用领域以及最佳实践,帮助读者更好地理解和应用MySQL数据库。二、MySQL
72 5
|
19天前
|
关系型数据库 MySQL 数据处理
企业级应用 mysql 日期函数变量,干货已整理
本文详细介绍了如何在MySQL8.0中使用DATE_FORMAT函数进行日期格式的转换,包括当日、昨日及不同时间段的数据获取,并提供了实际的ETL应用场景和注意事项,有助于提升数据处理的灵活性和一致性。
36 0
|
2月前
|
数据采集 关系型数据库 MySQL
MySQL表约束的种类与应用
在设计数据库时,合理应用各种约束对于创建一个结构化良好且能够有效维护数据完整性的数据库至关重要。每种约束类型都有其特定的应用场景,理解并正确应用这些约束,可以大大提高数据库应用的稳定性和性能。
36 3
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
2月前
|
关系型数据库 MySQL 数据库
docker启动mysql多实例连接报错Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’
docker启动mysql多实例连接报错Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’
163 0
|
20天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
55 3
Mysql(4)—数据库索引