阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 阿里云Logstash(简称Logstash)作为服务器端的数据处理管道,提供了100%兼容开源Logstash的能力。Logstash能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。本文主要演示如何基于公网方式将MySQL数据通过LogStash管道导入到ElasticSearch实例中。

Step By Step

1、创建ES实例和LogStash实例
图片.png

2、为LogStash配置SNAT

LogStash默认仅有VPC内网环境,如果想使用公网的数据源,需要配置NAT网关,本示例是从公网MySQL读取数据,所以需要配置SNAT。
图片.png

3、将SNAT IP加入MySQL网络白名单
图片.png

4、数据库基本信息获取
图片.png

5、数据库中建表和插入数据

DROP TABLE IF EXISTS `doctor_advisory_price_1`;
CREATE TABLE `doctor_advisory_price_1` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `doctor_id` bigint(20) NOT NULL COMMENT '医生ID',
  `advisory_price` int(10) NOT NULL COMMENT '咨询价格:分',
  `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `gmt_modify` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=304 DEFAULT CHARSET=utf8 COMMENT='医生咨询自定义价格表';

-- ----------------------------
-- Records of doctor_advisory_price
-- ----------------------------
INSERT INTO `doctor_advisory_price_1` VALUES (1, 123456, 2000, '2018-09-05 16:34:09', '2018-09-05 16:37:44');
INSERT INTO `doctor_advisory_price_1` VALUES (2, 1823784, 100, '2018-09-11 11:25:34', '2019-07-02 15:44:24');
INSERT INTO `doctor_advisory_price_1` VALUES (3, 1000247, 0, '2018-09-11 11:41:31', '2018-12-18 17:44:54');
INSERT INTO `doctor_advisory_price_1` VALUES (4, 44612299, 100, '2018-09-11 13:55:33', '2019-12-26 15:40:27');
INSERT INTO `doctor_advisory_price_1` VALUES (5, 44612298, 300, '2018-09-11 14:33:48', '2019-01-18 14:32:31');
INSERT INTO `doctor_advisory_price_1` VALUES (6, 61823709, 10000, '2018-09-11 16:28:57', '2018-09-11 16:28:57');
INSERT INTO `doctor_advisory_price_1` VALUES (7, 1899974, 10000, '2018-09-11 16:30:03', '2018-09-11 16:30:03');
INSERT INTO `doctor_advisory_price_1` VALUES (8, 61823711, 10000, '2018-09-11 17:16:07', '2018-09-11 17:16:07');
INSERT INTO `doctor_advisory_price_1` VALUES (9, 1610524, 0, '2018-09-11 17:31:50', '2019-08-22 14:16:45');
INSERT INTO `doctor_advisory_price_1` VALUES (10, 61823712, 2500, '2018-09-11 17:32:51', '2018-09-12 11:29:54');
INSERT INTO `doctor_advisory_price_1` VALUES (11, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (12, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (13, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (14, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (15, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (16, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (17, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (18, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (19, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (20, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (21, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (22, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (23, 32321043, 10000, '2018-09-13 18:20:19', '2018-09-13 18:20:19');
INSERT INTO `doctor_advisory_price_1` VALUES (24, 62023722, 0, '2018-09-14 10:28:00', '2018-09-14 10:28:00');
INSERT INTO `doctor_advisory_price_1` VALUES (25, 49522775, 0, '2018-09-14 11:00:23', '2019-05-28 11:47:12');
INSERT INTO `doctor_advisory_price_1` VALUES (26, 50622828, 100, '2018-09-14 14:08:55', '2019-12-27 14:41:14');
INSERT INTO `doctor_advisory_price_1` VALUES (27, 31210890, 100, '2018-09-14 14:27:48', '2019-01-15 14:24:37');
INSERT INTO `doctor_advisory_price_1` VALUES (28, 45822396, 200, '2018-09-14 14:59:14', '2019-01-18 10:25:16');
INSERT INTO `doctor_advisory_price_1` VALUES (29, 47322576, 100, '2018-09-15 10:01:26', '2018-09-15 10:01:26');
INSERT INTO `doctor_advisory_price_1` VALUES (30, 50632833, 0, '2018-09-15 10:09:24', '2018-10-10 16:58:38');

6、LogStash上传插件(mysql-connector-java-8.0.18.jar)
图片.png

7、ElasticSearch开启允许自动创建索引
图片.png

8、pipeline

# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。
input {
jdbc {
jdbc_driver_library => "/ssd/1/share/<LogStash实例Id>/logstash/current/config/custom/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<******>.mysql.rds.aliyuncs.com:3306/******?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "******"
jdbc_password => "<密码>"
schedule => "* * * * *"
statement => "SELECT * from doctor_advisory_price_1"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "gmt_modify"
last_run_metadata_path => "/ssd/1/<logstash id>/logstash/data/doctor_advisory_price-20201210.txt"
clean_run => false
}
}
filter {

}
output {
  elasticsearch {
hosts => "http://******.elasticsearch.aliyuncs.com:9200"
user => "elastic"
password => "<密码>"
index => "doctor_test_01"
document_id => "%{doctor_id}"
}
  # 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试
  # 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示
  # file_extend {
  #   path => "/ssd/1/<logstash id>/logstash/logs/debug/mysql_to_es3"
  # }
}

9、Kibana 查看
图片.png

参考链接

配置扩展文件
Logstash - 同步MYSQL数据到Elasticsearch

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
6月前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
274 0
|
3月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
4月前
|
关系型数据库 MySQL API
MySQL 历史数据迁移到 Elasticsearch
MySQL 历史数据迁移到 Elasticsearch
187 4
|
5月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
5月前
|
JSON 自然语言处理 数据库
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
概念、ik分词器、倒排索引、索引和文档的增删改查、RestClient对索引和文档的增删改查
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
|
4月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
332 0
|
6月前
|
关系型数据库 MySQL 网络安全
阿里云安装Mysql
阿里云安装Mysql
459 1
|
1月前
|
关系型数据库 MySQL 数据库连接
数据库连接工具连接mysql提示:“Host ‘172.23.0.1‘ is not allowed to connect to this MySQL server“
docker-compose部署mysql8服务后,连接时提示不允许连接问题解决
|
22天前
|
关系型数据库 MySQL 数据库
Docker Compose V2 安装常用数据库MySQL+Mongo
以上内容涵盖了使用 Docker Compose 安装和管理 MySQL 和 MongoDB 的详细步骤,希望对您有所帮助。
122 42
|
13天前
|
关系型数据库 MySQL 网络安全
如何排查和解决PHP连接数据库MYSQL失败写锁的问题
通过本文的介绍,您可以系统地了解如何排查和解决PHP连接MySQL数据库失败及写锁问题。通过检查配置、确保服务启动、调整防火墙设置和用户权限,以及识别和解决长时间运行的事务和死锁问题,可以有效地保障应用的稳定运行。
67 25