基于Canal的MySQL=>ES数据同步方案

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 基于Canal的MySQL=>ES数据同步方案

1、MySQL和ES的主要区别?

1.1 功能性

MySQL作为最常用的DB之一,在DB-Ranking排名常年保持前三,仅次于Oracle。为什么还需要把数据同步至排名只有第八的Elasticsearch?相信不了解这两个数据库的区别的同学很容易有这样的疑问。下面我来解答这个问题。

541842ada18448ef97eaf0666e79d0d4.png


首先两款数据库在功能上的区别:

MySQL Elasticsearch
关系型 搜索引擎
单体 分布式
OLTP OLAP
支持事务 不支持事务
SQL DSL


1.2 性能指标

再来看一下两种数据库的性能对比

MySQL Elasticsearch
检索性能 ★★★ ★★★★★
扩展能力 ★★ ★★★★★
写入实时性 ★★★★ ★★(可配置)
准确性 ★★★★ ★★
支持的数据类型 ★★★ ★★★★
灵活性 ★★★ ★★★★
事务支持 ★★★ ★★★★

可以看到,Elasticsearch在检索和扩展能力方面碾压MySQL。实际上MySQL在数据量达到 2-3千万的时候查询性能开始显著下降,而ES的检索性能,实测在数据量1亿左右,单节点1G内存,无任何优化的前提下仍能达到10ms以内的检索性能。并且ES具有MySQL无法相比的横向扩展能力,具备极高的可用性和扩展性。


1.3 在搜索业务上的区别

同样是检索结果,ES中的“检索”和关系型数据库的“查询”是两个不同的概念。搜索引擎的概念里,“检索”和相关度紧密耦合。


1.3.1 查询

关系数据库中“查询”的概念具有“完全相关性”,即搜索结果要么完全满足要求,要么不满足,而不存在“部分满足”的概念。


1.3.2 检索

而以ES为代表的搜索引擎中的“检索”(泛指全文检索),具备相关度的概念,即和搜索词预期搜索结果相符的程度。影响相关度的因素有很多,搜索量,数据本身等都可能影响最终结果。ES以相关度评分来衡量相关度的结果,这里不什么是相关度评分,如果感兴趣,可以异步“相关度评分”一文。


举个简单的例子:当用户搜索词为“小米手机”的时候,“查询”的执行逻辑为field_name='小米手机'或者field_name like '%小米手机%',即搜索结果的匹配逻辑为完全匹配或包含关系。而如果是检索,假设分词结果为“小米”和“手机”两个词项,搜索结果可能只包含“小米”或者只“手机”,如果设置了同义词等逻辑,匹配出手表、华为也是符合结果的。即搜索结果可能完全不包含搜索词。


2、为什么要做数据同步

MySQL是关系型数据库,而Elasticsearch是搜索引擎的核心组件之一,属于非关系型数据库。有一句经典名言叫:存在即合理,两款数据库既然同时存在,那么他们必然有对方不可取代的地方。

这里只讨论两者最主要的不可替代特性,至于扩展能力、数据类型等问题本文暂不讨论


2.1 检索性能

ES擅长海量数据的检索,支持PB级数据的秒级查询,以亿为单位的数据在ES看来只是起步而已,并且搜索结果具有非完全相关性,而是全文检索。其相关度有专门的算法来计算,主要是TF-IDF和BM25,两种算法这里不赘述,我的文章里有详细介绍。而MySQL虽然也有“全文索引”,但是非常积累,不管存存储效率还是检索效率都远不如ES,这也是其底层的数据结构和算法导致。普通索引目前底层数据结构为B+Trees,这种数据结构不适合存储长文本类型的索引,会导致树的深度过大,检索效率极低。其底层原理我有单独的文章详细介绍(待更新)。


2.2 写入性能

ES是OLAP系统,侧重于海量数据的检索,而写入实时性并不是很高,默认1秒,也就是ES缓冲区Buffer的刷新间隔时间,不了解Elasticsearch写入原理的同学可以暂时忽略。ES并非忽略了对写入性能的优化,而是“有意为之”,其原因就在于基于ES的写入机制,其写入实时性和大数据检索性能是一个二选一的行为。实际上生产环境中我们经常通过“牺牲写入实时性”的操作来换取更高更快的“数据检索”性能。


2.3 事务支持

正因为ES的写入实时性并不高,如果我们需要快速响应用户请求,我们常采取的手段就是使用缓存,但是在很多高并发的场景下,我们需要数据保持强一致性(如银行系统),因此需要使用具有ACID特性的数据库来支持,而MySQL就是一个比较好的选择。


但是如果单单只用MySQL,又无法解决海量数据的检索问题。其实在实际工作中常见的做法就是两者结合起来,通过数据同步的操作将MySQL的实时数据同步至Elasticsearch,两者各司其职互不干扰。


3、Canal

3.1 Canal是啥

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。


3.2 应用场景

Elasticsearch 不支持事务。 ES通常在分布式系统架构中承担“搜索引擎”的角色,一般来说解决词类问题,可以把ES和支持ACID特性的关系型数据库结合起来使用。

首先把对数据的更(增删改)操作在RDB中执行,然后把这些动作同步到Elasticsearch。 通过这种方式,你将受益于数据库 ACID 事务支持,并且在 Elasticsearch 中以正确的顺序产生变更。 并发在关系数据库中得到了处理。

MySQL为例,如果要把数据从同步至ES,canal + binary log就是常用的一种增量解决方案。


3.3 原理机制

7ba90dbf94c6a940b6a474309c83ab25.jpg


3.3.1 MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


3.3.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

7138b15848a0e26bd84862cc01d0b03c.png


3.3 优势

  • 准实时性
  • 性能好
  • 一劳永逸


4 基于canal的MySQL=>ES数据同步方案

4.1 环境:

Java和ES兼容性:https://www.elastic.co/cn/support/matrix#matrix_jvm

  • JDK:1.8
  • Elasticsearch:7.x
  • MySQL: 5.7
  • Canal: 1.1.4


4.2 下载地址:


4.3 步骤

4.3.1 保证Elasticsearch服务可用

使用cancal向ES同步数据之前需要保证ES的服务是可以正常访问的


4.3.2 保证MySQL服务可用

不再赘述


4.3.3 开启MySQL的binary log(主备模式)

配置MySQL:

server_id = 1 #开启主从模式后每个MySQL节点的id
log-bin = mysql-bin #bin-log的存储位置
binlog-format = ROW #选择存储binlog日志方式为ROW模式


重启MySQL服务

验证是否开启成功

SHOW VARIABLES LIKE 'log_bin';
log_bin ON #开启


4.4.4 canal-deployer

配置:conf/example/instance.properties

#canal示例的slaveId
canal.instance.mysql.slaveId=1234
#mysql地址
canal.instance.master.address= 127.0.0.1:3306
#用户名
canal.instance.dbUsername = root
#密码
canal.instance.dbPassword = 123456
#指定需要同步的数据库
canal.instance.defaultDatabaseName = msb_order
#指定编码方式
canal.instance.connectionCharset = UTF-8
#监控的是所有数据库,所有的表改动都会监控到,这样可能会浪费不少性能,可能我只想监控的是某一个数据库下的表。
#  .*\\..*表示监控所有数据库,canal\\..*表示监控canal数据库
canal.instance.filter.regex = .\*\\\\..\*


启动: ./startup.sh(Linux)

验证: demo


4.4.5 canal-admin

配置:conf/application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: admin

启动管理服务

访问服务:server_ip:8089


4.4.6 canal-adapter

配置:conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/msb_order?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        key: exampleKey
        properties:
          mode: rest # transport or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch


配置:conf/es7/my_order.yml

这里要注意,在写SQL语句的时候,要保证a.id as _id,即把id属性改为_id,以保证能正常写入ES的_id字段。

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId: g1                     # 对应MQ模式下的groupId, 只会同步对应groupId的数据_search
esMapping:
  _index: msb_order             # es 的索引名称
  _type: _doc         # es7 固定'_doc',es8删除
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  upsert: true
  sql: "SELECT a.id as _id, a.customer_phone, a.customer_name, a.customer_region, a.customer_city, a.customer_district, a.customer_addr, a.failure_phenomenon, a.failure_phenomenon_text, a.book_date, a.service_required, a.buyshop_detail, a.buy_shop, a.buy_way, a.product_type, a.model_no, a.process_type, a.ex_order_no, a.customer_country, a.source, a.matnr, a.buyDate, a.serviceReq, a.create_time, a.result, a.rvstatus, a.ip_address FROM msb_order_1 a"
  commitBatch: 3000  


服务启动

注意:

  • 索引msb_order的mapping必须提前创建好
  • 索引中的id字段是_id,因此需要查询的时候需要id as _id
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
12天前
|
存储 SQL 关系型数据库
Mysql高可用架构方案
本文阐述了Mysql高可用架构方案,介绍了 主从模式,MHA模式,MMM模式,MGR模式 方案的实现方式,没有哪个方案是完美的,开发人员在选择何种方案应用到项目中也没有标准答案,合适的才是最好的。
69 3
Mysql高可用架构方案
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
120 1
|
13天前
|
关系型数据库 MySQL
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
37 5
|
18天前
|
关系型数据库 MySQL
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
mysql 5.7.x版本查看某张表、库的大小 思路方案说明
28 1
|
29天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
323 0
|
1月前
|
SQL 关系型数据库 MySQL
mysql集群方案
mysql集群方案
39 0
|
8天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
22 4
|
6天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
19 1
|
1月前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
61 3
Mysql(4)—数据库索引
|
15天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
77 1

热门文章

最新文章