【技术实验】mysql准实时同步数据到Elasticsearch

简介: Elasticsearch作为大数据场景下搜索和分析的引擎,广泛应用于实时数据分析等场景。本文作者梳理了从MySQL准实时同步数据到Elasticsearch的实操步骤,帮助开发者理解和快速上手。

实验背景

Elasticsearch在阿里云商业化已经有一段时间,它作为大数据场景下搜索和分析的引擎,可以用于很多场景。前两天有同学提到需要将MySQL中的数据准实时的同步到ElasticSearch中的需求,由于自己对ES也很感兴趣但一直没有机会实操,恰好趁这个机会学习验证了一下,并把过程记录下来,方便新人尽快上手少走弯路。

本次实操采用Logstash实现将MySQL数据实时同步到ElasticSearch,过程中主要以实操步骤为主,并没有详细介绍这两个产品本身,所以文档适合对Logstash和Elasticsearch有一些基础的人阅读。使用Logstash过程中会用到一些参数,每个参数的详细解释请参考官方文档。

实验步骤

1. 开通阿里云Elasticsearch

开通阿里云Elasticsearch。产品链接

_

ES控制台中修改配置,允许自动创建索引。

_

2. 开通对应的ECS和RDS(MySQL)

开通与Elasticsearch相同Region、相同专有网络下的ECS一台,建议配置2核8G,操作系统:Aliyun Linux 17.1 64位,用于运行Logstash程序。

_

开通RDS(MySQL)

_

3. 在ECS上安装和配置curl

SSH登陆到ESC中,检查ECS是否已经安装curl。

curl是利用URL语法在命令行方式下工作的开源文件传输工具,可以用于测试访问ES服务。(Aliyun Linux 17.1 64位默认是预装curl的,若其他系统没有预装请自行安装)。
_

检查ECS中是否可以通过curl命令访问到ES服务。

curl -u ES用户名:ES密码 -XGET 'http://ES服务IP:9200/?pretty'
红色字体部分要替换相应的用户名、密码和ES服务IP或域名。如下图所以,能返回红框内的内容说明curl是可以访问到ES服务的。
_

4. 安装JDK8、MySQL5.6驱动以及Logstash -6.0.0

ECS中分别安装JDK8、MySQL5.6驱动以及Logstash -6.0.0。如下图:

_

安装Logstash input、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。

安装插件的命令分别是(在Logstash主目录下运行):
./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
_

5. MySQL中创建数据库、测试的数据表

如下图所示

_
建表语句(其中updatetime用于记录数据更新时间戳):

create table jm_es_employee (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );

6. 配置Logstash作业文件

ECS中创建Logstash作业配置文件,文件名为logstash-mysql-es.conf。

配置文件内容:

input{
     jdbc {
         jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name"
         jdbc_user => "db_user"
         jdbc_password => "db_password"
         jdbc_paging_enabled => "true"
         jdbc_page_size => "1000"
         jdbc_default_timezone =>"Asia/Shanghai"
         schedule => "* * * * *"
         statement => "select * from jm_es_employee where updatetime > :sql_last_value"
         use_column_value => true
         tracking_column => "updatetime"
         last_run_metadata_path => "./logstash_jdbc_last_run"
       } 
} 
output{
      elasticsearch {
         hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"
         user => "elastic"
         password => "es_password"
         index => "employee"
         document_id => "%{id}"
      }
      stdout {
         codec => json_lines
     }
 } 

其中红色字体部分要做相应的替换,input中的 schedule参数用于配置数据刷新频率,schedule => " *"表示每分钟刷新一次,这也是MySQL数据同步的最小频率。Logstash支持丰富的参数配置,详情请参考Elasitc官网文档

7. 同步数据

ECS中指定参数启动Logstash服务,执行命令:

logstash -f logstash-mysql-es.conf

_

之后每分钟会去MySQL中刷新数据

_

RDS中写入几条测试数据,脚本如下:

INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]'); 

由于之前在Logstash配置文件中,output部分既配置了输出到ES,同时也输出到控制台。所以当检测到MySQL中有更新时,数据会输出到控制台中,如下图:

_

此时说明MySQL中的数据更新已经被Logstash推送到ES服务。通过在ECS执行命令检查ES服务中的索引是否被创建。执行命令:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'

_

红框内的employee即我们在配置文件中指定的索引名,说明ES中的索引已经被成功创建。

8. 结果验证

通过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令通过关键字“Smith “来检索数据:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty' 

_

至此,MySQL中的数据已经被成功索引到Elasticsearch,并也可以被准实时的检索到。

作者:奇米 阿里巴巴高级工程师

加入钉钉技术讨论群

dingQR


阿里云Elasticsearch已正式发布啦,Elastic开源官方联合开发,集成5.5.3商业版本XPack功能,欢迎开通使用。
点击了解更多产品信息
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
9月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
7月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
387 0
|
6月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
507 10
|
7月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
189 0
|
10月前
|
关系型数据库 MySQL Linux
在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾
以上就是在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾的步骤。这个过程就像是一场接力赛,数据从MySQL数据库中接力棒一样传递到备份文件,再从备份文件传递到其他服务器,最后再传递回MySQL数据库。这样,即使在灾难发生时,我们也可以快速恢复数据,保证业务的正常运行。
481 28
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2894 45
|
9月前
|
存储 SQL 缓存
mysql数据引擎有哪些
MySQL 提供了多种存储引擎,每种引擎都有其独特的特点和适用场景。以下是一些常见的 MySQL 存储引擎及其特点:
237 0
|
11月前
|
存储 SQL 关系型数据库
【YashanDB知识库】MySQL迁移至崖山char类型数据自动补空格问题
**简介**:在MySQL迁移到崖山环境时,若字段类型为char(2),而应用存储的数据仅为'0'或'1',查询时崖山会自动补空格。原因是mysql的sql_mode可能启用了PAD_CHAR_TO_FULL_LENGTH模式,导致保留CHAR类型尾随空格。解决方法是与应用确认数据需求,可将崖山环境中的char类型改为varchar类型以规避补空格问题,适用于所有版本。
|
11月前
|
SQL 关系型数据库 MySQL
【YashanDB知识库】字符集latin1的MySQL中文数据如何迁移到YashanDB
本文探讨了在使用YMP 23.2.1.3迁移MySQL Server字符集为latin1的中文数据至YashanDB时出现乱码的问题。问题根源在于MySQL latin1字符集存放的是实际utf8编码的数据,而YMP尚未支持此类场景。文章提供了两种解决方法:一是通过DBeaver直接迁移表数据;二是将MySQL表数据转换为Insert语句后手动插入YashanDB。同时指出,这两种方法适合单张表迁移,多表迁移可能存在兼容性问题,建议对问题表单独处理。
【YashanDB知识库】字符集latin1的MySQL中文数据如何迁移到YashanDB
|
11月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版
  • 推荐镜像

    更多