Logstash-input-jdbc实现与MySql解耦同步数据到ES

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: ElasticSearch作为实时的分布式搜索和分析引擎,能够快速查询和处理海量数据,高可用可扩展,通过ES,可以轻松做到保证查询效率的聚合查询。

随着业务数据的增长,Mysql压力增大,单表储存会严重影响读写的高效性,为解决这一问题,分表分库和历史数据迁移等解决方案就应运而生了,但在某些场景下这些解决方案还是存在问题,例如聚合查询等。而ElasticSearch作为实时的分布式搜索和分析引擎,能够快速查询和处理海量数据,高可用可扩展,通过ES,可以轻松做到保证查询效率的聚合查询,已有不少大型互联网企业成功使用案例,是值得一试的技术。

使用了一段时间ES之后,发现对数据的实时插入对网络带宽有比较大的消耗,且高并发时对服务器产生的压力也不小,所以能不能定时地去查询sql数据库,保持数据的同步,来解耦这个问题呢?

经过调研,实现与Mysql同步工具基本有以下几种:

  • logstash-input-jdbc:官方推荐的插件,Ruby编写的
  • go-mysql-elastic:Go编写,国内作者
  • python-mysql-replication: Python编写
  • elasticsearch-jdbc:Java编写
  • elasticsearch-river-mysql:已经很少维护

0. 安装 ElasticSearch

本文不再赘述ES的安装过程。

1. 下载 Logstash

官网链接: Download Logstash 以6.1.1版本为例:

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.1.1.zip
# 等待下载完毕,执行解压
unzip logstash-6.1.1.zip

2. 安装 logstash-input-jdbc 插件

进入解压好的目录 cd logstash-6.1.1

root用户执行:sudo bin/logstash-plugin install logstash-input-jdbc

补充:后面查阅一些文章说logstash5.x以上版本自带了这个插件,但我已经安装了,就不再验证,这里应该可以直接略过此步,后面如果报错再回来折腾。

安装过程会有点慢,我在本地一次成功了,服务器上卡在Installing logstash-input-jdbc不动,第二天早上又试了下又成功了,如果一直不行尝试以下方法,执行:

cd logstash-6.1.1
wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.3.1.zip
unzip logstash-input-jdbc-4.3.1.zip
cd logstash-input-jdbc-4.3.1/

再执行:

vi Gemfile

修改 source 的值 为: "https://ruby.taobao.org""https://gems.ruby-china.org"

保存完继续执行:

vi logstash-input-jdbc.gemspec

找到这一行:

s.files = `git ls-files`.split($\)

改为:

s.files = [".gitignore", "CHANGELOG.md", "Gemfile", "LICENSE", "NOTICE.TXT", "README.md", "Rakefile", "lib/logstash/inputs/jdbc.rb", "lib/logstash/plugin_mixins/jdbc.rb", "logstash-input-jdbc.gemspec", "spec/inputs/jdbc_spec.rb"]

继续执行:

gem build logstash-input-jdbc.gemspec

cd ..
bin/plugin install logstash-input-jdbc-4.3.1.gem
该方法属于手动安装,需要你指定插件版本,参照 plugins-inputs-jdbc官方文档查看插件当前的最新版本号。

3. 关联Mysql数据库

下载一个mysql连接jar包:mysql-connector-java-6.0.6

进入logstash创建一个conf文件,执行:

cd logstash-6.1.1/config
vim jdbc.conf

编辑粘贴以下内容:

input {
        jdbc {
            # 刚才下载的jar包路径
            jdbc_driver_library =>"/opt/mysql-connector-java-6.0.6.jar"
            jdbc_driver_class =>"com.mysql.jdbc.Driver"
            # 数据库地址
            jdbc_connection_string =>"jdbc:mysql://192.168.0.1:3306/myTest?characterEncoding=UTF-8&useSSL=false"
            jdbc_user =>"root"
            jdbc_password =>"123456"
            # 定时查询
            schedule =>"* * * * *"
            # 执行的sql语句,两种形式
            #statement_filepath => "/opt/test.sql"
            statement =>"SELECT * FROM tname"
        }
    }
    output {
        elasticsearch {
            # 转换ES索引
            index =>"name_index"
            document_type =>"name_type"
            # 关联表中的id字段,对应索引id
            document_id =>"%{id}"
            template_overwrite => true
            hosts =>["127.0.0.1:9200"]
        }
    }
选择filepath形式执行sql语句的话需要额外创建一个sql文件,在里面编写sql语句,这里省略。

返回到logstash-6.1.1目录,然后执行:


bin/logstash -f config/jdbc.conf

等待一会,会看到终端输出了刚才写的sql语句,因为开启了schedule,根据以上配置大概30s就会执行一次sql查询,增量更新到ES的索引。需要注意的是,对于更新操作,目前插件都不能实现。

浏览器访问 http://服务器ip:9200/name_index/name_type/_search ,即可看到同步的数据。

最后将进程挂载后台让它一直做定时查询即可。

4. 操作数据

参考官方视图工具 kibana 选择对应版本,下载到本地,解压,进入config/kibana.yml 修改 elasticsearch.url为你服务器的ES地址,运行 bin/kibana 浏览器访问 http://localhost:5601 ,即可在本地连接线上ES服务器,比较常用的功能例如 Dev Tools

Elasticsearch: 权威指南

5. 多个数据表查询

input {
            jdbc {
                jdbc_driver_library =>"/opt/mysql-connector-java-6.0.6.jar"
                jdbc_driver_class =>"com.mysql.jdbc.Driver"
                jdbc_connection_string =>"jdbc::3306/?characterEncoding=UTF-8&useSSL=false"
                jdbc_user =>""
                jdbc_password =>""
                schedule =>"* * * * *"
                statement_filepath => "/opt/order_goods.sql"
                #statement =>"SELECT * FROM tname"
                                        type => "jdbc_order_goods"
            }
                        jdbc {
                jdbc_driver_library =>"/opt/mysql-connector-java-6.0.6.jar"
                jdbc_driver_class =>"com.mysql.jdbc.Driver"
                jdbc_connection_string =>"jdbc::3306/?characterEncoding=UTF-8&useSSL=false"
                jdbc_user =>""
                jdbc_password =>""
                schedule =>"* * * * *"
                statement_filepath => "/opt/user.sql"
                                        type => "jdbc_user"
            }
        }

output {
                if[type] == "jdbc_order_goods"{
        elasticsearch {
                hosts =>["127.0.0.1:9200"]
                                        index =>"order_goods_index"
                document_type =>"order_goods_type"
                document_id =>"%{id}"
        }
    }
    if[type] == "jdbc_user"{
        elasticsearch {
                hosts =>["127.0.0.1:9200"]
                index => "user_index"
                document_type => "user_type"
                document_id => "%{id}"
        }
    }
}
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
287 0
|
3天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
34 16
|
1月前
|
搜索推荐 关系型数据库 MySQL
MySQL中的模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其在处理搜索功能时。虽然Elasticsearch(ES)等搜索引擎在处理文本搜索方面表现出色,但在一些场景下,直接使用MySQL数据库实现模糊匹配也是一个经济且高效的选择。本文将分享如何在不引入ES的情况下,利用MySQL实现模糊匹配的五大步骤和十个实战案例。
120 1
|
1月前
|
自然语言处理 监控 关系型数据库
MySQL模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其是在不引入Elasticsearch(ES)等外部搜索引擎的情况下。MySQL作为强大的关系型数据库,提供了多种实现模糊匹配的方法。本文将分享如何在MySQL中实现模糊匹配,并提供五大步骤和十个实战案例,帮助你提升查询效率和性能。
169 1
|
2月前
|
Java 关系型数据库 MySQL
mysql5.7 jdbc驱动
遵循上述步骤,即可在Java项目中高效地集成MySQL 5.7 JDBC驱动,实现数据库的访问与管理。
474 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
106 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
51 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
57 0
|
2天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
10 3
|
2天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
14 3

推荐镜像

更多