使用canal增量同步mysql数据库信息到ElasticSearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 本文介绍如何使用canal增量同步mysql数据库信息到ElasticSearch。(注意:是增量!!!) 1.简介 1.1 canal介绍 Canal是一个基于MySQL二进制日志的高性能数据同步系统。

本文介绍如何使用canal增量同步mysql数据库信息到ElasticSearch。(注意:是增量!!!)

1.简介

1.1 canal介绍

Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数据管道,github地址:https://github.com/alibaba/canal

Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。

它具有以下功能:

  1. 支持所有平台。
  2. 支持由Prometheus提供支持的细粒度系统监控。
  3. 支持通过不同方式解析和订阅MySQL binlog,例如通过GTID。
  4. 支持高性能,实时数据同步。(详见Performance)
  5. Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持
  6. Docker支持。

缺点:

不支持全量更新,只支持增量更新。

完整wiki地址:https://github.com/alibaba/canal/wiki

1.2 运作原理

原理很简单:

  1. Canal模拟MySQL的slave的交互协议,伪装成mysql slave,并将转发协议发送到MySQL Master服务器。
  2. MySQL Master接收到转储请求并开始将二进制日志推送到slave(即canal)。
  3. Canal将二进制日志对象解析为自己的数据类型(原始字节流)

如图所示:

1.3 同步es

在同步数据到es的时候需要使用适配器:canal adapter。目前最新版本1.1.3,下载地址:https://github.com/alibaba/canal/releases

目前es貌似支持6.x版本,不支持7.x版本!!!

2.准备工作

2.1 es和jdk

安装es可以参考:https://www.dalaoyang.cn/article/78

安装jdk可以参考:https://www.dalaoyang.cn/article/16

2.2 安装canal server

下载canal.deployer-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

解压文件

tar -zxvf canal.deployer-1.1.3.tar.gz

进入解压后的文件夹

cd canal.deployer-1.1.3

修改conf/example/instance.properties文件,主要注意以下几处:

  • canal.instance.master.address:数据库地址,例如127.0.0.1:3306
  • canal.instance.dbUsername:数据库用户
  • canal.instance.dbPassword:数据库密码

完整内容如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

回到canal.deployer-1.1.3目录下,启动canal:

sh bin/startup.sh

查看日志:

vi logs/canal/canal.log

查看具体instance日志:

 vi logs/example/example.log

关闭命令

sh bin/stop.sh

2.3 安装canal-adapter

下载canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz

解压

tar -zxvf canal.adapter-1.1.3.tar.gz

进入解压后的文件夹

cd canal.adapter-1.1.3

修改conf/application.yml文件,主要注意如下内容,由于是yml文件,注意我这里说明的属性名称:

  • server.port:canal-adapter端口号
  • canal.conf.canalServerHost:canal-server地址和ip
  • canal.conf.srcDataSources.defaultDS.url:数据库地址
  • canal.conf.srcDataSources.defaultDS.username:数据库用户名
  • canal.conf.srcDataSources.defaultDS.password:数据库密码
  • canal.conf.canalAdapters.groups.outerAdapters.hosts:es主机地址,tcp端口

完整内容如下:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 12345678
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9300
        properties:
         cluster.name: elasticsearch

另外需要配置conf/es/*.yml文件,adapter将会自动加载conf / es下的所有.yml结尾的配置文件。在介绍配置前,需要先介绍一下本案例使用的表结构,如下:

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

需要手动去es中创建索引,比如这里使用es-head创建,如下图:

test索引结构如下:

{
    "mappings":{
        "_doc":{
            "properties":{
                "name":{
                    "type":"text"
                },
                "address":{
                    "type":"text"
                }
            }
        }
    }
}

接下来创建test.yml(文件名随意),内容很好理解_index为索引名称,sql为对应语句,内容如下:

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
  _index: test
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id,a.name,a.address from test a"
  commitBatch: 3000

配置完成后,回到canal-adapter根目录,执行命令启动

bin/startup.sh

查看日志

vi logs/adapter/adapter.log

关闭canal-adapter命令

bin/stop.sh

3.测试

都启动成功后,先查看一下es-head,如图,现在是没有任何数据的。

接下来,我们在数据库中插入一条数据进行测试,语句如下:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝阳区');

然后在看一下es-head,如下

接下来看一下日志,如下:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝阳区"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} 
Affected indexes: test 

小知识点:上面介绍的查看日志的方法可能不是很好用,推荐使用如下语法,比如查看日志最后200行:

tail -200f logs/adapter/adapter.log

4.总结

1.全量更新不能实现,但是增删改都是可以的。
2.一定要提前创建好索引。
3.es配置的是tcp端口,比如默认的9300

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
7月前
|
canal 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行整库同步MySQL数据到StarRocks时,遇到全量数据可以同步,但增量数据无法同步,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
925 4
|
4月前
|
存储 SQL 缓存
数据库测试|Elasticsearch和ClickHouse的对决
由于目前市场上主流的数据库有许多,这次我们选择其中一个比较典型的Elasticsearch来和ClickHouse做一次实战测试,让大家更直观地看到真实的比对数据,从而对这两个数据库有更深入的了解,也就能理解为什么我们会选择ClickHouse。
数据库测试|Elasticsearch和ClickHouse的对决
|
6月前
|
Oracle 关系型数据库 数据库
关系型数据库Oracle增量备份
【7月更文挑战第18天】
90 2
|
5月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
950 0
|
6月前
|
Java 关系型数据库 数据库
实时计算 Flink版产品使用问题之如何将增量数据直接写入下游数据库
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
关系型数据库 MySQL 数据库
关系型数据库mysql数据增量恢复
【7月更文挑战第3天】
171 2
|
7月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何实现MySQL的实时增量同步
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
131 6
|
7月前
|
canal 关系型数据库 MySQL
蓝易云 - 详解canal同步MySQL增量数据到ES
以上就是使用Canal同步MySQL增量数据到Elasticsearch的基本步骤。在实际操作中,可能还需要根据具体的业务需求和环境进行一些额外的配置和优化。
198 2
|
26天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
55 3