产品目标
完成实时数据采集,拉取业务数据库数据。
版本选择
Canal-1.1.3
JDK-1.8
MySQL-5.7
Kafka-2.1.0
概述
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
起源:早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
工作原理
mysql主备复制实现:
复制分成三步:
1、master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
2、slave将master的binary log events拷贝到它的中继日志(relay log);
3、slave重做中继日志中的事件,将改变反映它自己的数据。
且Canal的工作原理也非常简单:
1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3、canal解析binary log对象(原始为byte流).
【注意】
在使用Canal之前,需要确认binlog已经开启:
登录MySQL之后,键入以下内容可以查看日志是否打开:
show variables like "%log_bin%";
如果没有打开,则需要在/etc/my.cnf中的[mysqld]下新增内容,然后重启mysql即可。
[mysqld] server_id=1 log_bin=mysql-bin binlog_format=ROW
Canal安装与使用
Canal下载
Canal官网:https://github.com/alibaba/canal
Canal从1.1.1版本之后,默认支持将canal server接收到的binlog数据直接投递到MQ,目前支持的消息队列系统包括:
Kafka:https://github.com/apache/kafka
RocketMQ:https://github.com/apache/rocketmq
选择适用版本,下载Canal-deployer-1.1.3.tar.gz即可。解压后的目录如下:
Canal配置
Canal的配置分为2部分,除了系统根配置之外,还有instance级别的配置文件——每个instance一份。二者分别的位置在于:
此处附带当前本地测试中的配置,主要对Canal本体:
canal.properties:
canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/test_canal?useUnicode=true&characterEncoding=UTF-8 canal.manager.jdbc.username=root canal.manager.jdbc.password=root canal.id = 1 canal.ip = canal.port = 11111 canal.metrics.pull.port = 11112 canal.zkServers = hiwes:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false #canal.instance.global.manager.address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = hiwes:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all # use transaction for kafka flatMessage batch produce canal.mq.transaction = false #canal.mq.properties. =
instance.properties:
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/test_canal #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # mq config canal.mq.topic=canal_topic # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
如果设置的canal.serverMode=TCP,那么默认端口为11111, 如果你设置servermode为kafka,这个端口也就不存在了。
Canal使用
进行了上述的配置之后,启动服务即可:
bin/startup.sh
当服务端启动完毕之后,在客户端监听指定库的变化即可。创建一个manven项目,其中Canal的依赖项也很简单:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.13</version> </dependency>
官方maven示例
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; /** * A Camel Application * 官方提供 */ public class MainApp { /** * A main() so we can easily run these routing rules in our IDE */ public static void main(String... args) throws Exception { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); /* 如果修改了canal配置的instance文件 则不能使用connector.subscribe(".*\\..*") 会使配置文件失效 // connector.subscribe(".*\\..*"); 如果 设置了canal.instance.filter.regex。 可以设置 connector.subscribe(); 不填写任何配置。 */ connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry .EntryType .TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
注意line_22的位置:newSingleConnector那里,设置canal server的ip和端口,端口默认为11111。example是和conf目录下的相对应。
Entry内部格式
1.Header version [协议的版本号,default = 1] logfileName [binlog文件名] logfileOffset [binlog position] serverId [服务端serverId] serverenCode [变更数据的编码] executeTime [变更数据的执行时间] sourceType [变更数据的来源,default = MYSQL] schemaName [变更数据的schemaname] tableName [变更数据的tablename] eventLength [每个event的长度] eventType [insert/update/delete类型,default = UPDATE] gtid [当前事务的gitd] 2.entryType [事务头BEGIN/事务尾END/数据ROWDATA/HEARTBEAT/GTIDLOG] 3.storeValue [byte数据,可展开,对应的类型为RowChange] (RowChange) tableId [tableId,由数据库产生] eventType [数据变更类型,default = UPDATE] isDdl [标识是否是ddl语句,比如create table/drop table] sql [ddl/query的sql语句] ddlSchemaName [ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName] 3.1rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [字段信息,增量数据(修改前,删除前),Column类型的数组] afterColumns [字段信息,增量数据(修改后,新增后),Column类型的数组] (Column) index [字段下标] sqlType [jdbc type] name [字段名称(忽略大小写),在mysql中是没有的] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [字段值,timestamp,Datetime是一个时间格式的文本] length [对应数据对象原始长度] mysqlType [字段mysql类型]
测试数据结果
配置中对本地mysql的数据库test_canal下的所有表进行监控,并写入改动日志数据到kafka指定的canal_topic,以下提供部分生产的测试数据:
# 插入数据详情 {"data":[{"id":"4","gmt_create":"2021-06-01 10:28:59","gmt_modified":"2021-06-01 10:28:59","destination":"example","binlog_file":"master.000040","binlog_offest":"223390","binlog_master_id":"1","binlog_timestamp":"1622514539000","use_schema":"test_canal","sql_schema":"test_canal","sql_table":"test","sql_text":"create table test(\nid varchar(10),\ncontent varchar(20))","sql_type":"CREATE","extra":null}],"database":"test_canal","es":1622514539000,"id":5,"isDdl":false,"mysqlType":{"id":"bigint(20) unsigned","gmt_create":"datetime","gmt_modified":"datetime","destination":"varchar(128)","binlog_file":"varchar(64)","binlog_offest":"bigint(20)","binlog_master_id":"varchar(64)","binlog_timestamp":"bigint(20)","use_schema":"varchar(1024)","sql_schema":"varchar(1024)","sql_table":"varchar(1024)","sql_text":"longtext","sql_type":"varchar(256)","extra":"text"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"gmt_create":93,"gmt_modified":93,"destination":12,"binlog_file":12,"binlog_offest":-5,"binlog_master_id":12,"binlog_timestamp":-5,"use_schema":12,"sql_schema":12,"sql_table":12,"sql_text":2005,"sql_type":12,"extra":-4},"table":"meta_history","ts":1622514540193,"type":"INSERT"} # 插入数据 {"data":[{"id":"001","content":"test001"}],"database":"test_canal","es":1622514586000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(10)","content":"varchar(20)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"content":12},"table":"test","ts":1622514586845,"type":"INSERT"} # 新建数据表 {"data":null,"database":"test_canal","es":1622514539000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table test(\nid varchar(10),\ncontent varchar(20))","sqlType":null,"table":"test","ts":1622514540193,"type":"CREATE"} # 删除数据 {"data":[{"id":"10","name":"zhangpan10"}],"database":"test_canal","es":1622514296000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"user","ts":1622514296613,"type":"DELETE"}
总结
相较Maxwell,Canal解析Binlog对象(原始为Byte流),并根据业务场景,将解析后的对象(这个意思就是说:数据格式更加自由,可以自己选择对象分发)分发到后续系统(MySQL、Kafka、ES等)。对比起来Maxwell的使用和部署更加简单,且直接将数据变更输出为JSON字符串,不再需要编写客户端,对于缺乏基础建设,短时间又需要快速迭代的项目比较合适。