一 canal应用架构设计
组件说明:
- linux内核版本(CentOS Linux 7):(命令:uname -a)
Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
- mysql版本:(SQL命令:select version(); 或 status)
Server version: 5.6.43-log MySQL Community Server (GPL)
- canal版本:canal-1.1.3
- JDK版本: 1.8
canal工作原理:
- 模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;
- mysql master收到dump请求,开始推送binary log给slave(也就是canal);
- 解析binary log对象(原始为byte流)
了解更多详细更新可以查看文章:【了解canal,看这个就够了】
二 架构落地实现流程
2.1 mysql配置与安装
1. 下载安装
在192.168.175.21和192.168.175.22上分别安装mysql,具体安装流程可参考文章:Linux-安装MySQL.
2. 创建canal账户
在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; mysql> GRANT ALL ON canal.* TO 'canal'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%'; mysql>FLUSH PRIVILEGES;
3. 验证登录
#远程登录 mysql -h 192.168.175.22 -P 3306 -u canal -pcanal #本地登录 mysql -ucanal -pcanal
4. 修改my.cnf配置
分别在175.21和175.22两台服务器修改my.conf配置,查找my.cnf配置位置命令:whereis my
.
192.168.175.21中的my.cnf配置新增如下内容:
log_bin=mysql-bin #指定bin-log的名称,尽量可以标识业务含义 binlog_format=row #选择row模式,必须!!! server_id=1 #mysql服务器id
2.2 canal server配置与启动
1. 下载canal
下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
2.上传并解压
进入192.168.175.20服务器,使用rz命令上传,使用如下命令进行解压至/usr/local/hadoop/app/canal
:
tar xzvf canal.deployer-1.1.3.tar.gz -C canal
3. 修改配置
新解压的文件夹/usr/local/hadoop/app/canal/conf/
有一个example
文件夹,一个example就代表一个instance实例.而一个instance实例就是一个消息队列,所以这里可以将文件名改为example1,同时再复制出来一个叫example2.(命名可以使用监听的数据库名)
修改/usr/local/hadoop/app/canal/conf/example1/instance.properties
配置文件:
canal.instance.master.address=192.168.175.21:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example1
修改/usr/local/hadoop/app/canal/conf/example2/instance.properties
配置文件:
canal.instance.master.address=192.168.175.22:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example2
配置文件参数说明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide
4. 启动canal server
进入文件夹/usr/local/hadoop/app/canal/bin
执行如下命令:
./startup.sh
查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log
,出现如下内容,即表示启动成功:
2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111] 2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
5. 启动canal client
注意运行canal客户端代码时,一定要先启动canal server!!!
(1) 添加pom依赖
<!--canal--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
(2) canal client代码:
package com.xgh.canal; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; public class CanalClientTest { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.20", 11111), "example1", "", "");//或者example2 int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*");//订阅所有库下面的所有表 //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal connector.rollback(); int totalEmtryCount = 1200; while (emptyCount < totalEmtryCount) {//实际生产中需要设置为true,死循环 Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
canal client运行实例:
empty count : 1 empty count : 2 empty count : 3 empty count : 4
6. 触发数据库变更
创建库:create database canal;
创建表:create table t_canal (id int,name varchar(20),status int);
插入数据:insert into t_canal values(10,'hello',1);
canal client输出日志:
================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT id : 10 update=true name : hello update=true status : 1 update=true
三. 自问自答-为何设置了数据表的过滤条件,但貌似没有生效?
答:首先看文档AdminGuide,了解canal.instance.filter.regex的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
- 所有表:.* or .\..
- canal schema下所有表: canal\..*
- canal下的以canal打头的表:canal\.canal.*
- canal schema下的一张表:canal.test1
- 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)。
检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据
【特别注意】