canal的介绍
canal的历史由来
在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了canal,主要是基于trigger(触发器)
的方式获取增量变更。从 2010 年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。
当前的 canal 支持的数据源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)
canal的应用场景
目前普遍基于日志增量订阅和消费的业务,主要包括
- 基于数据库增量日志解析,提供增量数据订阅和消费
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
canal的工作原理
在介绍canal的原理之前,我们先来了解下MySQL主从复制的原理
MySQL主从复制原理
- MySQL master 将数据变更的操作写入二进制日志
binary log
中, 其中记录的内容叫做二进制日志事件binary log events
,可以通过show binlog events
命令进行查看 - MySQL slave 会将 master 的
binary log
中的binary log events
拷贝到它的中继日志relay log
- MySQL slave 重读并执行
relay log
中的事件,将数据变更映射到它自己的数据库表中
了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?
canal工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log
给 slave (也就是 canal )
canal 解析 binary log
对象(数据为byte
流)
基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现mysql实时增量数据传输的功能。
既然canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。
canal的Docker环境准备
因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。
什么是Docker
相信绝大多数人都使用过虚拟机Vmware,在使用Vmware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且Vmware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。
为了便于大家快速理解Docker,便与Vmware做对比来做介绍,docker 提供了一个开始,打包,运行app的平台,把app(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似Vmware的系统镜像)与容器(类似Vmware里安装的系统)
什么是Image(镜像)
- 文件和meta data的集合(root filesystem)
- 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
- 不同的image可以共享相同的layer
- Image本身是read-only的
什么是Container(容器)
- 通过Image创建(copy)
- 在Image layer 之上建立一个container layer(可读写)
- 类比面向对象:类和实例
- Image负责app的存储和分发,Container负责运行app
Docker的网络介绍
Docker的网络类型有三种:
bridge:桥接网络
默认情况下启动的Docker容器,都是使用 bridge,Docker安装时创建的桥接网络, 每次Docker容器重启时,会按照顺序获取对应的IP地址, 这个就导致重启下,Docker的IP地址就变了
none:无指定网络
使用 --network=none ,docker 容器就不会分配局域网的IP
host:主机网络
使用 --network=host,此时,Docker 容器的网络会附属在主机上,两者是互通的。 例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。
创建自定义网络:(设置固定IP)
docker network create --subnet=172.18.0.0/16 mynetwork
查看存在的网络类型docker network ls
ThahadeMacBook-Pro:~ hana$ docker network ls NETWORK ID NAME DRIVER SCOPE 547d75277780 bridge bridge local cd57bd432a9b host host local 82354ec0c7d2 mvnetwork bridge local 270f35f46b8f none null local
搭建canal环境
附上Docker的下载安装地址==> Docker Download
下载canal镜像docker pull canal/canal-server
rootedefault:"# docker pull canal/canal-server Using default taq: latest latest: Pulling from canal/canal-server 1c8f9aa56c90:Downloading 2.161 MB/69.8 MB c5e21c824d1c: Dowmloading 2.047 MB/50.09 MB4ba7edb60123:Waiting5160a1c76c08:Waiting adbb838a0064:Waiting ab50c0db3511:Waiting
下载mysql镜像docker pull mysql
,下载过的则如下图
hahadeMacBook-Pro:~ haha$ docker pull mysql Using default tag: latest latest: Pulling from library/mysgl Digest:sha256:415ac63da0ae6725d5aefc9669a1c02f39a00c574fdbc478dfd08 db1e97c8f1b Status: Image is up to date for mysgl:latest
查看已经下载好的镜像docker images
hahadeMacBook-Pro:~ haha$ docker images REPOSITORY TAG IMAGE ID CR REATED SIZE 3 rabbitmq 3.7.15-management 1482b87815ec weeks ago 192MB mysql latest c7109f74d339 weeks ago 443MB 6 kibana 7.1.1 67f17df6ca3e weeks ago 746MB elasticsearch 7.1.1 b0e9f9f047e6 6 weeks ago 894MB 3 canal/canal-server latest b9c3d95520a5 months ago 831MB 了 zookeeper 3.4.13 4ebfb9474e72 months ago 150MB mobz/elasticsearch-head 5-alpine e2a76963bc18 2 years ago 78.9MB
接下来通过镜像生成mysql容器与canal-server容器
##生成mysql容器 docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql ##生成canal-server容器 docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server ## 命令介绍 --net mynetwork #使用自定义网络 --ip #指定分配ip
查看docker中运行的容器docker ps
hahadeMacBook-Pro:~ haha$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAM ES 8254af09f19d mysql "docker-entry point.s." 21 hours ago Up 11 hours 0.0.0.0:3306->33 06/tcp,33060/tcp mys 91 4d5bf7a14209 mobz/elasticsearch-head:5-alpine "/bin/sh -c node_mo..." 21 hours ago Up 11 hours 0.0.0.0:9100->91 00/tcp ela sticsearch-head ca198845f55c canal/canal-server "/alidata/bin /main.s..." 21 hours ago Up 32 minutes 2222/tcp,8000/t cp,11112/tcp,0.0.0.0:11111->11111/tcp can al-server sticsesren b5c1e3a581f4 elasticsearch:7.1.1 "/usr/local/b in/dock.." 21 hours ago Up 11 hours 0.0.0.0:9200->92 00/tcp,0.0.0.0:9300->9300/tcp
MySQL的配置修改
以上只是初步准备好了基础的环境,但是怎么让canal伪装成salve并正确获取mysql中的binary log呢?
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,通过修改mysql配置文件来开启bin_log,使用 find / -name my.cnf
查找my.cnf, 修改文件内容如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
进入mysql容器docker exec -it mysql bash
创建链接MySQL的账号canal
并授予作为 MySQL slave 的权限, 如果已有账户可直接 GRANT
mysql -uroot -proot # 创建账号 CREATE USER canal IDENTIFIED BY 'canal'; # 授予权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; # 刷新并应用 FLUSH PRIVILEGES;
数据库重启后, 简单测试 my.cnf 配置是否生效
mysgl> show variables like 'log bin'; ---------- Variable name |Value | ---- .+--- --- log_bin ION -- ----- ----- 1 row in set (0.0l sec) mysgl> show variables like 'binlog format'; ------ --- Variable name /Value --- binlog format |ROW - ------- --- 1 row in set (0.00 sec) mysgl> show master status; --- | File Position|Binlog Do DB | Binlog Ignore DB | Executed Gtid Set l -- --- --- mysql-bin.0000046160/ 1 row in set (0.00 sec) mysgl> show master status\G; *************************** 1. roW ********************* File:mysql-bin.000004 Position:6160 Binlog_Do DB: Binlog Ignore DB: Executed Gtid Set: I row in set (0.00 sec)
show variables like 'log_bin'; show variables like 'log_bin'; show master status;
canal-server的配置修改
进入canal-server容器
docker exec -it canal-server bash
编辑canal-server的配置
vi canal-server/conf/example/instance.properties
################################################# mvsal servertd_ v1 a 26+ will autoGen canal.instance.mvsgl.slaveId=1234 # enable gtid use true/false canal.instance.gtidon=false 改成自己的数据库信息 # nosition info canal.instance.master.address=172.18.0.6:3306 canal.instance.master.iournal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysq1://127.0.0.13306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standbv.iournal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid= # username/password 改成自己的数据库信息 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #canal.instance.defaultDatabaseName=studentdb canal.instance.connectionCharset =UTF-8# enable druid Decrypt database password canal.instance.enableDruid=false @程十掘金技术社区 #canal.instance.pwdPublicKey=MFWWDQYJKoZIhVCNAQEBBQADSWAWSAJBALK4BUxdD1tRRE5/zXpVEVPUgunvscYFtEip3pmL1hrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfi
更多配置请参考==>canal配置说明
重启canal-server容器docker restart canal-server
进入容器查看启动日志
docker exec -it canal-server bash tail -100f canal-server/logs/example/example.log
019-07-09 17:33:09.620 [destination = example address = /172.18.0.6:3306 EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEve ntParserProxy----> find start position su essfully,EntryPosition[included=falseiournalName=binlog.000004,pos ition=1899,serverId=1,gtid=,timestamp=1562655804000] cost :120ms the next step is binlog dump019-07-09 21:26:32.816 [main] INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [canal.properties] 019-07-09 21:26:32.822 [main]INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [example/instance.properties] 019-07-09 21:26:33.304 [main] WARN o.s.beans.GenericTypeAwareProper tyDescriptor-Invalid JavaBean property 'connectionCharset' being a ccessed! Ambiguous write methods found next o actually used [public void com.alibaba.otter.canal.parse.inbound.m sql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pan inbound.msa1.AbstractMvsqEventParser.setConnectionCharsetiava.ni o.charset.charset)] 019-07-09 21:26:33.429 [main] INFO c.a.o.c.i.spring.support.Property yPlaceholderConfigurer - Loading properties file from class path reso ource [canal.properties] 919-07-09 21:26:33.430 [main] INFO c.a.o.c.i.spring.support.property paceholderConfiaurer -Loading properties file from class path reso ource [example/instance.properties919-07-09 21:26:33.986main ERROR com.alibaba.druid.pool.DruidData Source-testWhileIdle is true, validationQuery not set 019-07-09 21:26:34.729 [main] INFO c.a.otter.canal.instance.spring. CanalInstanceWithSpring - start CannalInstance for 1-example 019-07-09 21:26:34.752 [main] WARN c.a.o.canal.parse.inbound.mysgl.l dbsync.LogEventConvert ---> init table filter :^.*\..*$ 019-07-09 21:26:34.753 [main] WARN c.a.o.canal.parse.inbound.mysgl. dbsync.LogEventConvert ---> init table black filter : 019-07-09 21:26:34.794 [main] INFO c.a.otter.canal.instance.core.Ab: stractCanalInstance - start successful. 019-07-09 21:26:35.107 [destination = example address = /172.18.0. 6:3306 EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEver ntParserProxy - ---> begin to find start po tion, it will be long time for reset or first position
至此,我们的环境工作准备完成!!!
拉取数据并同步保存到ElasticSearch
本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令
# 下载对镜像 docker pull elasticsearch:7.1.1 docker pull mobz/elasticsearch-head:5-alpine # 创建容器并运行 docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1 docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取canal解析后的binlog数据。首先我们基于spring boot搭建一个canal demo应用。结构如下图所示
Student.java
package com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; /** * 普通的实体domain对象 * @Data 用户生产getter、setter方法 */ @Data public class Student implements Serializable { private String id; private String name; private int age; private String sex; private String city; }
CanalConfig.java
package com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * 配置一些跟canal相关到配置与公共bean * @author haha */ @Configuration public class CanalConfig { // @Value 获取 application.properties配置中端内容 @Value("${canal.server.ip}") private String canalIp; @Value("${canal.server.port}") private Integer canalPort; @Value("${canal.destination}") private String destination; @Value("${elasticSearch.server.ip}") private String elasticSearchIp; @Value("${elasticSearch.server.port}") private Integer elasticSearchPort; @Value("${zookeeper.server.ip}") private String zkServerIp; /** * 获取简单canal-server连接 */ @Bean public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", ""); return canalConnector; } /** * 通过连接zookeeper获取canal-server连接 */ @Bean public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", ""); return canalConnector; } /** * elasticsearch 7.x客户端 */ @Bean public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort)) ); return client; } }
CanalDataParser.java
由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从github上获取
/** * 元祖类型的对象定义 * @param <A> * @param <B> */ public static class TwoTuple<A, B> { public final A eventType; public final B columnMap; public TwoTuple(A a, B b) { eventType = a; columnMap = b; } } /** * 解析canal中的message对象内容 * @param entrys * @return */ public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) { List<TwoTuple<EventType, Map>> rows = new ArrayList<>(); for (Entry entry : entrys) { // binlog event的事件事件 long executeTime = entry.getHeader().getExecuteTime(); // 当前应用获取到该binlog锁延迟的时间 long delayTime = System.currentTimeMillis() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 当前的entry(binary log event)的条目类型属于事务 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { TransactionBegin begin = null; try { begin = TransactionBegin.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务头信息,执行的线程id,事务耗时 logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); printXAInfo(begin.getPropsList()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; try { end = TransactionEnd.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务提交信息,事务id logger.info("----------------\n"); logger.info(" END ----> transaction id: {}", end.getTransactionId()); printXAInfo(end.getPropsList()); logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); } continue; } // 当前entry(binary log event)的条目类型属于原始数据 if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null; try { // 获取储存的内容 rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 获取当前内容的事件类型 EventType eventType = rowChage.getEventType(); logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); // 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环 if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql() + SEP); continue; } printXAInfo(rowChage.getPropsList()); // 循环当前内容条目的具体数据 for (RowData rowData : rowChage.getRowDatasList()) { List<CanalEntry.Column> columns; // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容 if (eventType == CanalEntry.EventType.DELETE) { columns = rowData.getBeforeColumnsList(); } else { columns = rowData.getAfterColumnsList(); } HashMap<String, Object> map = new HashMap<>(16); // 循环把列的name与value放入map中 for (Column column: columns){ map.put(column.getName(), column.getValue()); } rows.add(new TwoTuple<>(eventType, map)); } } } return rows; }
ElasticUtils.java
package com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.util.Map; /** * es的crud工具类 * @author haha */ @Slf4j @Component public class ElasticUtils { @Autowired private RestHighLevelClient restHighLevelClient; /** * 新增 * @param student * @param index 索引 */ public void saveEs(Student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("保存数据至ElasticSearch成功:{}", response.getId()); } catch (Exception e) { log.error("保存数据至elasticSearch失败: {}", e); } } /** * 查看 * @param index 索引 * @param id _id * @throws Exception */ public void getEs(String index, String id) { GetRequest getRequest = new GetRequest(index, id); GetResponse response = null; try { response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource(); for (Map.Entry<String, Object> entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } catch (Exception e) { log.error("从elasticSearch获取数据失败: {}", e); } } /** * 更新 * @param student * @param index 索引 * @throws Exception */ public void updateEs(Student student, String index) { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.doc(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = null; try { response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("更新数据至ElasticSearch成功:{}", response.getId()); } catch (Exception e) { log.error("更新数据至elasticSearch失败: {}", e); } } /** * 根据id删除数据 * @param index 索引 * @param id _id * @throws Exception */ public void DeleteEs(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = null; try { response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("从elasticSearch删除数据成功:{}", response.getId()); } catch (Exception e) { log.error("从elasticSearch删除数据失败: {}", e); } } }
BinLogElasticSearch.java
package com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; /** * 获取binlog数据并发送到es中 * * @author haha */ @Slf4j @Component public class BinLogElasticSearch { @Autowired private CanalConnector canalSimpleConnector; @Autowired private ElasticUtils elasticUtils; //@Qualifier("canalHaConnector")使用名为canalHaConnector的bean @Autowired @Qualifier("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalSimpleConnector); // 轮询拉取数据 Integer batchSize = 5 * 1024; while (true) { // Message message = canalHaConnector.getWithoutAck(batchSize); Message message = canalSimpleConnector.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); log.info("当前监控到binLog消息数量{}", size); if (id == -1 || size == 0) { try { // 等待4秒 Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //1. 解析message对象 List<CanalEntry.Entry> entries = message.getEntries(); List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries); for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) { if (tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); // 2。将解析出的对象同步到elasticSearch中 elasticUtils.saveEs(student, "student_index"); // 3.消息确认已处理 canalSimpleConnector.ack(id); // canalHaConnector.ack(id); } if (tuple.eventType == CanalEntry.EventType.UPDATE) { Student student = createStudent(tuple); elasticUtils.updateEs(student, "student_index"); // 3.消息确认已处理 canalSimpleConnector.ack(id); // canalHaConnector.ack(id); } if (tuple.eventType == CanalEntry.EventType.DELETE) { elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalSimpleConnector.ack(id); // canalHaConnector.ack(id); } } } } } /** * 封装数据至Student对象中 * * @param tuple * @return */ private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) { Student student = new Student(); student.setId(tuple.columnMap.get("id").toString()); student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString())); student.setName(tuple.columnMap.get("name").toString()); student.setSex(tuple.columnMap.get("sex").toString()); student.setCity(tuple.columnMap.get("city").toString()); return student; } /** * 打开canal连接 * * @param canalConnector */ private void openCanalConnector(CanalConnector canalConnector) { //连接CanalServer canalConnector.connect(); // 订阅destination canalConnector.subscribe(); } /** * 关闭canal连接 * * @param canalConnector */ private void closeCanalConnector(CanalConnector canalConnector) { //关闭连接CanalServer canalConnector.disconnect(); // 注销订阅destination canalConnector.unsubscribe(); } }
CanalDemoApplication.java(spring boot 启动类)
package com.example.canal.study; import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 应用的启动类 * @author haha */ @SpringBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); } // 程序启动则执行run方法 @Override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); } }
application.properties
server.port=8081 spring.application.name = canal-demo canal.server.ip = localhost canal.server.port = 11111 canal.destination = example zookeeper.server.ip = localhost:2181 zookeeper.sasl.client = false elasticSearch.server.ip = localhost elasticSearch.server.port = 9200
canal集群高可用的搭建
通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canal的多实例集群方式如何搭建呢!
基于zookeeper获取canal实例
准备zookeeper的docker镜像与容器
docker pull zookeeper docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
最终效果如图
hahadeMacBook-Pro:~ haha$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 3f920cfac809 zookeeper zookeeper "/docker-entr rypoint..." 13 hours ago Up 13 hours 2888/tcp,3888/t ccp,0.0.0.0:2181->2181/tcp,8080/tcp 75f69fbd93a9 canal-server2:1.0 "/alidata/bin /main.s..." 22 hours ago Up 12 hours 2222/tcp,8000/t tcp,11111-11112/tcp, 0.0.0.0:11113->11113/tcp canal-server2 8254af09f19d mysql "docker-entry ypoint.s..." 47 hours ago Up 13 hours 0.0.0.0:3306->33 306/tcp,33060/tcp mysql 4d5bf7a14209 mobz/elasticsearch-head:5-alpine "/bin/sh -c node_mo..." 47 hours ago Up 13 hours 0.0.0.0:9100->91 100/tcp elasticsearch-head ca198845f55c canal/canal-server "/alidata/bin /main.s..." 47 hours ago Up 2 hours 2222/tcp,8000/t tcp,11112/tcp, 0.0.0.0:11111->11111/tcp b5c1e3a581f4 elasticsearch:7.1.1 "/usr/local/b in/dock..." 47 hours ago Up 13 hours 0.0.0.0:9200->92 200/tcp,0.0.0.0:9300->9300/tcp
- 机器准备
- 运行canal的容器ip: 172.18.0.4 , 172.18.0.8
- zookeeper容器ip:172.18.0.3:2181
- mysql容器ip:172.18.0.6:3306
- 按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
- 修改canal.properties,加上zookeeper配置并修改canal端口
canal.port=11113 canal.zkServers=172.18.0.3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
- 创建example目录,并修改instance.properties
canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保证slaveId不重复即可 canal.instance.master.address = 172.18.0.6:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
启动两个不同容器的canal,启动后,可以通过tail -100f logs/example/example.log
查看启动日志,只会看到一台机器上出现了启动成功的日志。
比如我这里启动成功的是 172.18.0.4
2019-07-09 21:26:36.215 [destination = example , address = /172.18.0.6:3306,EventParser WARN c.a.o.c.p.inbound.mvsal.rds.RdsBinloaE entParserProxy - ---> find start position suc cessfully,EntryPosition[included=falsejournalName=binlog.000004,po osition=2098,serverId=1,gtid=,timestamp=1562672817000] cost :1085ms, the next step is binlog dump2019-07-09 22:21:27.458Thread-7INFO c.a.otter.canal.instance.co ore.AbstractCanalInstance - stop CannalInstance for null-example2019-07-09 22:21:27.586Thread-7] INFO c.a.otter.canal.instance.co are.AbstractCanalInstance - stop successful.... 2019-07-09 22:57:58.581 [main] INFO c.a.o.c.i.spring.support.Proper tyPlaceholderConfigurer-Loading properties file from class path re esource [canal.properties] 2019-07-09 22:57:58.602 [main] INFO c.a.o.c.i.spring.support.Proper tvplaceholderConfigurer -Loading properties file from class path re source [example/instance.properties] 2019-07-09 22:57:59.278 [main] WARN o.s.beans.GenericTypeAwarePrope ertyDescriptor -Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound. mysq1.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pars e.inbound.mvsa1.AbstractMvsaEventParser.setConnectionCharsetiava.n io.charset.Charset)] 2019-07-09 22:57:59.366 [main] INFO c.a.o.c.i.spring.support.Proper rtyPlaceholderConfigurer - Loading properties file from class path re source [canal.properties] 2019-07-09 22:57:59.367 [main] INFO c.a.o.c.i.spring.support.proper tyPlaceholderConfigurer -Loading properties file from class path re source [example/instance.properties] 2019-07-09 22:57:59.836[main] ERROR com.alibaba.druid.pool.DruidDat aSource - testWhileIdle is true, validationQuerv not set 2019-07-09 22:58:00.497 [main] INFO c.a.otter.canal.instance.spring Cana InstanceWithSpring - start Canna instance for 1-example 2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] dbsvnc.ogEventConvert ---> init table filter:^,*\.,*$ 2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] adbsync.LogEventConvert - --> init table black filter : 2019-07-09 22:58:00.569 [main] INFO c.a.otter.canal.instance.core.A ostractCanalInstance -start successful.... 2019-07-09 22:58:00.894 [destination = example , address = /172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - ---> begin to find start pos ition, it will be long time for reset or first position 2019-07-09 22:58:00.900 [destination = example address=/172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - prepare to find start positi on just show master status
查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
客户端链接, 消费数据
可以通过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
对应的客户端编码可以使用如下形式,上文中的CanalConfig.java
中的canalHaConnector
就是一个HA连接
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等 (聪明的你,应该也可以发现,canal client也可以支持HA功能)
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.124.5:59887","clientId":1001}
数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次你重启client时,会从这最后一个位点继续进行消费)
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
停止正在工作的172.18.0.4的canal server
docker exec -it canal-server bash cd canal-server/bin sh stop.sh
这时172.18.0.8会立马启动example instance,提供新的数据服务
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.8:11111","cid":1}
与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成
异常与总结
elasticsearch-head无法访问elasticsearch
es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/ [root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml # 文件末尾加上如下配置 http.cors.enabled: true http.cors.allow-origin: "*"
修改完配置文件后需重启es服务
elasticsearch-head查询报406 Not Acceptable
▼ General RequestURL: http://127.0.0.1:9200/student index/ search Request Method: POST Status Code: 406 Not Acceptable Remote Address: 127.0.0.1:9200 Referrer Policy: no-referrer-when-downgrade ▼Response Headers access-control-allow-origin: * content-encoding:qzip
解决方法: 1、进入head安装目录; 2、cd _site/ 3、编辑vendor.js 共有两处 #6886行 contentType: "application/x-www-form-urlencoded 改成 contentType: "application/json;charset=UTF-8" #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" && 改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
使用elasticsearch-rest-high-level-client报org.elasticsearch.action.index.IndexRequest.ifSeqNo
#pom中除了加入依赖 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.1.1</version> </dependency> #还需加入 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.1.1</version> </dependency>
相关参考 git hub issues
为什么ElasticSearch要在7.X版本不能使用type?
参考:为什么ElasticSearch要在7.X版本去掉type?
使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException
由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考RestHighLevelClient API
设置docker容器开启启动
如果创建时未指定 --restart=always ,可通过update 命令 docker update --restart=always [containerID]
docker for Mac network host 模式不生效
host 模式是为了性能,但是这却对 docker 的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用 --netwokr host 开启 Host 模式,但需要注意的是,如果你用 Windows 或 Mac 本地启动容器的话,会遇到 host 模式失效的问题。原因是 host 模式只支持 Linux 宿主机。
参见官方文档:docs.docker.com/network/hos…
客户端连接zookeeper报authenticate using SASL(unknow error)
Opening socket connection to server 192.168.124.5/192.168.124.5:2181. Will not attempt to authenticate using SASL (unknown error):Session 0x0 for server null, unexpected error, closing soc ket connection and attempting reconnect
- zookeeper.jar与dokcer中的zookeeper版本不一致
- zookeeper.jar 使用了3.4.6之前的版本
出现这个错的意思是zookeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。
在项目代码中加入
System.setProperty("zookeeper.sasl.client", "false");
,如果是spring boot 项目可以在application.properties中加入
zookeeper.sasl.client=false
参考:Increased CPU usage by unnecessary SASL checks
如果更换canal.client.jar中依赖的zookeeper.jar的版本
把canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git
,然后修改client模块下pom.xml文件中关于zookeeper的内容,然后重新mvn install
<artifactId>netty-all</artifactId></dependency><!-- zk --><dependency> <groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency><dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId></dependency> <!-- external --><dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId></dependency><dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId></dependency><dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId></dependency><dependency> <groupId>com.alibaba</groupId>
把自己项目依赖的包替换为刚刚mvn install
生产的包
<version>7.1.1</version></dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency> <dependency> <groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId> k!_-<version>1.1.3</version>- <version>1.1.4-SNAPSH0T</version></dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId><version>2.6</version></dependency> project >dependencies >dependency >artifactld
zookeeper返回的是docker容器中的ip,而宿主机ip与容器ip不是同一个网段,无法ping通
修改hosts文件只可以实现域名到ip的映射(域名重定向),iptables可以实现端口的重定向,但是这个问题是要通过ip到ip的重定向可以解决,但是研究了一下没找到怎么设置(windows、mac),所以我们修改canal的官方源码来达到我们想要的目的。修改ClusterCanalConnector.java
中的connect()
方法。
以下是修改后内容对比图
es = 0; Erue){ currentConnector=new SimpleCanalConnector( address: null, username,password,( @Override public SocketAddress getNextAddress() { return accessStrategy.nextNode(); }; currentConnector.setSoTimeout(soTimeout); currentConnector.setIdleTimeout(idleTimeout); if (filter != null){ currentConnector.setFilter(filter);
@Override public SocketAddress getNextAddress() { SocketAddress address = accessStrategy.nextNode(); if (address instanceof InetSocketAddress){ String hostName =((InetSocketAddress)address).getHostName(); int port =((InetSocketAddress) address).getPort(); // 截取zookeeper上canalServer的ip前3位与宿主机ip前3位做比较 if(hostName !=null&&!hostName.substring(0,3).equals(AddressUti // 如果不是同一网段则使用宿主机IP address=newInetSocketAddress(AddressUtils.getHostIp(),port} return address; return address
关于选型的取舍
本文示例项目源代码==>canal-elasticsearch-sync