MySQL到Elasticsearch数据同步

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: MySQL数据同步Elasticsearch方案

MySQL与Elasticsearch同步

当业务需要对海量数据进行多维度、实时的搜索时,关系型数据库显然力不从心。一个非常典型的例子就是对产品或者商品进行多维度搜索。此时,业务经常需要借助搜索引擎Elasticsearch满足多样化的实时搜索诉求。搜索开始前,首先需要解决的问题是如何将待搜索数据从关系型数据库实时同步到Elaticsearch

方案1:阿里云数据传输DTS

DTS支持本地IDC自建MySQL/其他云厂商MySQL/阿里云ECS自建MySQL/RDS MySQL->Elasticsearch之间的数据实时同步。通过DTS提供的数据实时同步功能,用户只要3步就可搭建起MySQL同Elasticsearch的实时同步实例,实现基于MySQL Binlog的毫秒级同步延迟。

方案2:Logstash将MySQL数据同步到ElasticSearch

开源工具成本高:待同步表必须更新时间字段、业务写入数据必须更新时间
性能影响大:通过SQL读取数据、对线上业务影响大
同步延迟高:定期数据同步、同步时延高达数小时
稳定性差:无法解决RDS实例迁移及、日志回收情况下的同步稳定性

除mysql外,还支持oracle、SQL server、PostgreSQL等数据库类型

使用logstash-input-jdbc插件读取mysql的数据,这个插件的工作原理比较简单,就是定时执行一个sql,然后将sql执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP的update_time字段,id字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在mysql表设计的时候都增加一个update_time字段。

  • 安装MySQL的jdbc驱动
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
  • 在mysql库中创建一个测试表(其中updatetime用于记录数据更新时间戳):
create table jm_es_employee (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );

创建Logstash配置文件,文件名为logstash-mysql-es.conf

input{
     jdbc {
         jdbc_driver_library => "/opt/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://39.107.84.120:3306/lidehang"
         jdbc_user => "root"
         jdbc_password => "Ldh_0571@"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         #设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新
         schedule => "* * * * *"
         statement => "select * from jm_es_employee where updatetime >= :sql_last_value"
         #statement_filepath => "mysql.sql"
         use_column_value => true
         tracking_column_type => "timestamp"
         tracking_column => "updatetime"
         last_run_metadata_path => "./logstash_jdbc_last_run"
       }
}
output{
      elasticsearch {
         hosts => "172.17.134.34:9200"
         index => "employee"
         document_id => "%{id}"
      }
      stdout {
         codec => json_lines
     }
 }
jdbc_driver_library: jdbc驱动的路径,在上一步中已经下载
jdbc_driver_class: 驱动类的名字,mysql填com.mysql.jdbc.Driver
jdbc_connection_string: mysql 地址
jdbc_user: mysql 用户
jdbc_password: mysql密码
schedule: 执行sql时机,类似 crontab 的调度,上面配置表示每分钟刷新一次。
statement: 要执行的sql,以 “:” 开头是定义的变量,可以通过parameters 来设置变量,这里的sql_last_value是内置的变量,表示上一次sql执行中update_time的值
statement_filepath:和上面statement参数二选一,存放需要执行的SQL语句的文件位置,适用于多个sql语句的场景。
use_column_value: 使用递增列的值
tracking_column_type: 递增字段的类型,numeric表示数值类型, timestamp 表示时间戳类型
tracking_column: 递增字段的名称,这里使用updatetime这一列,这列的类型是timestamp
last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改

index: 导入到es中的index名,这里我直接设置成了mysql表的名字
document_id: 导入到es中的文档id,这个需要设置成主键,否则同一条记录更新后在es中会出现两条记录,%{id} 表示引用mysql表中id字段的值
  • 启动logstash服务,开始同步mysql数据到es:

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-mysql-es.conf

  • 写入几条测试数据:
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]'); 

执行命令检查ES服务中的索引是否被创建:

curl -XGET 'http://172.17.134.34:9200/_cat/indices?v'

通过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令通过关键字“Smith “来检索数据:

curl -XGET 'http://172.17.134.34:9200/employee/_search?q=last_name:Smith&pretty' 

方案3:使用阿里云开源工具Canal

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

  • canal工作原理

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
image

  • Canal 的组成部分

简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。

  • 配置 MySQL 主节点

MySQL 默认没有开启 Binlog,因此我们需要对 my.cnf 文件做以下修改:

server-id = 1
log_bin = /path/to/mysql-bin.log
binlog_format = ROW

注意 binlog_format 必须设置为 ROW, 因为在 STATEMENT 或 MIXED 模式下, Binlog 只会记录和传输 SQL 语句(以减少日志大小),而不包含具体数据,我们也就无法保存了。

从节点通过一个专门的账号连接主节点,这个账号需要拥有全局的 REPLICATION 权限。我们可以使用 GRANT 命令创建这样的账号:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  • 启动 Canal 服务端

从 GitHub 项目发布页中下载 Canal 服务端代码(https://github.com/alibaba/canal/releases
配置文件在 conf 文件夹下,有以下目录结构:

canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties

conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服务端监听的端口。instanceA/instance.properties 则是各个实例的配置文件,主要的配置项有:

# slaveId 不能与 my.cnf 中的 server-id 项重复
canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
# 订阅实例中所有的数据库和表
canal.instance.filter.regex = .*\\..*
  • 编写 Canal 客户端

从服务端消费变更消息时,我们需要创建一个 Canal 客户端,指定需要订阅的数据库和表,并开启轮询。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
119 1
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
618 4
|
3月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
1月前
|
关系型数据库 MySQL API
MySQL 历史数据迁移到 Elasticsearch
MySQL 历史数据迁移到 Elasticsearch
71 4
|
29天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
321 0
|
2月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
81 0
|
8天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
22 4
|
6天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
19 1
|
15天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
76 1