阿里云ElasticSearch使用LogStash通过公网将MySQL数据导入

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 阿里云Logstash(简称Logstash)作为服务器端的数据处理管道,提供了100%兼容开源Logstash的能力。Logstash能够动态地从多个来源采集数据、转换数据,并且将数据存储到所选择的位置。通过输入、过滤和输出插件,Logstash可以对任何类型的事件加工和转换。本文主要演示如何基于公网方式将MySQL数据通过LogStash管道导入到ElasticSearch实例中。

Step By Step

1、创建ES实例和LogStash实例
图片.png

2、为LogStash配置SNAT

LogStash默认仅有VPC内网环境,如果想使用公网的数据源,需要配置NAT网关,本示例是从公网MySQL读取数据,所以需要配置SNAT。
图片.png

3、将SNAT IP加入MySQL网络白名单
图片.png

4、数据库基本信息获取
图片.png

5、数据库中建表和插入数据

DROP TABLE IF EXISTS `doctor_advisory_price_1`;
CREATE TABLE `doctor_advisory_price_1` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `doctor_id` bigint(20) NOT NULL COMMENT '医生ID',
  `advisory_price` int(10) NOT NULL COMMENT '咨询价格:分',
  `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `gmt_modify` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=304 DEFAULT CHARSET=utf8 COMMENT='医生咨询自定义价格表';

-- ----------------------------
-- Records of doctor_advisory_price
-- ----------------------------
INSERT INTO `doctor_advisory_price_1` VALUES (1, 123456, 2000, '2018-09-05 16:34:09', '2018-09-05 16:37:44');
INSERT INTO `doctor_advisory_price_1` VALUES (2, 1823784, 100, '2018-09-11 11:25:34', '2019-07-02 15:44:24');
INSERT INTO `doctor_advisory_price_1` VALUES (3, 1000247, 0, '2018-09-11 11:41:31', '2018-12-18 17:44:54');
INSERT INTO `doctor_advisory_price_1` VALUES (4, 44612299, 100, '2018-09-11 13:55:33', '2019-12-26 15:40:27');
INSERT INTO `doctor_advisory_price_1` VALUES (5, 44612298, 300, '2018-09-11 14:33:48', '2019-01-18 14:32:31');
INSERT INTO `doctor_advisory_price_1` VALUES (6, 61823709, 10000, '2018-09-11 16:28:57', '2018-09-11 16:28:57');
INSERT INTO `doctor_advisory_price_1` VALUES (7, 1899974, 10000, '2018-09-11 16:30:03', '2018-09-11 16:30:03');
INSERT INTO `doctor_advisory_price_1` VALUES (8, 61823711, 10000, '2018-09-11 17:16:07', '2018-09-11 17:16:07');
INSERT INTO `doctor_advisory_price_1` VALUES (9, 1610524, 0, '2018-09-11 17:31:50', '2019-08-22 14:16:45');
INSERT INTO `doctor_advisory_price_1` VALUES (10, 61823712, 2500, '2018-09-11 17:32:51', '2018-09-12 11:29:54');
INSERT INTO `doctor_advisory_price_1` VALUES (11, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (12, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (13, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (14, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (15, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (16, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (17, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (18, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (19, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (20, 61913713, 10000, '2018-09-12 11:17:38', '2018-09-12 11:17:38');
INSERT INTO `doctor_advisory_price_1` VALUES (21, 1610440, 100, '2018-09-12 14:47:32', '2018-09-12 14:51:36');
INSERT INTO `doctor_advisory_price_1` VALUES (22, 1902393, 500, '2018-09-13 11:46:29', '2018-09-13 11:46:29');
INSERT INTO `doctor_advisory_price_1` VALUES (23, 32321043, 10000, '2018-09-13 18:20:19', '2018-09-13 18:20:19');
INSERT INTO `doctor_advisory_price_1` VALUES (24, 62023722, 0, '2018-09-14 10:28:00', '2018-09-14 10:28:00');
INSERT INTO `doctor_advisory_price_1` VALUES (25, 49522775, 0, '2018-09-14 11:00:23', '2019-05-28 11:47:12');
INSERT INTO `doctor_advisory_price_1` VALUES (26, 50622828, 100, '2018-09-14 14:08:55', '2019-12-27 14:41:14');
INSERT INTO `doctor_advisory_price_1` VALUES (27, 31210890, 100, '2018-09-14 14:27:48', '2019-01-15 14:24:37');
INSERT INTO `doctor_advisory_price_1` VALUES (28, 45822396, 200, '2018-09-14 14:59:14', '2019-01-18 10:25:16');
INSERT INTO `doctor_advisory_price_1` VALUES (29, 47322576, 100, '2018-09-15 10:01:26', '2018-09-15 10:01:26');
INSERT INTO `doctor_advisory_price_1` VALUES (30, 50632833, 0, '2018-09-15 10:09:24', '2018-10-10 16:58:38');

6、LogStash上传插件(mysql-connector-java-8.0.18.jar)
图片.png

7、ElasticSearch开启允许自动创建索引
图片.png

8、pipeline

# input插件需要监听Logstash进程所在节点的端口,请使用8000~9000范围内的端口。
input {
jdbc {
jdbc_driver_library => "/ssd/1/share/<LogStash实例Id>/logstash/current/config/custom/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<******>.mysql.rds.aliyuncs.com:3306/******?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "******"
jdbc_password => "<密码>"
schedule => "* * * * *"
statement => "SELECT * from doctor_advisory_price_1"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "gmt_modify"
last_run_metadata_path => "/ssd/1/<logstash id>/logstash/data/doctor_advisory_price-20201210.txt"
clean_run => false
}
}
filter {

}
output {
  elasticsearch {
hosts => "http://******.elasticsearch.aliyuncs.com:9200"
user => "elastic"
password => "<密码>"
index => "doctor_test_01"
document_id => "%{doctor_id}"
}
  # 支持output中添加file_extend output配置,即可在管道部署完成后直接查看输出结果,进行结果验证与调试
  # 请勿修改系统指定路径,注释或删除file_extend output部分配置,可关闭配置调试。详情见下方提示
  # file_extend {
  #   path => "/ssd/1/<logstash id>/logstash/logs/debug/mysql_to_es3"
  # }
}

9、Kibana 查看
图片.png

参考链接

配置扩展文件
Logstash - 同步MYSQL数据到Elasticsearch

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
7天前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
25 0
|
1天前
|
关系型数据库 MySQL 网络安全
阿里云安装Mysql
阿里云安装Mysql
12 1
|
2天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2天前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之mysql节点如何插入数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4天前
|
关系型数据库 MySQL 数据库
MySQL 复制A的表结构和数据到表B
在MySQL中复制表A至表B可通过不同方法实现。一种是先用`CREATE TABLE B LIKE A;`复制结构,再用`INSERT INTO B SELECT * FROM A;`填充数据。另一种更简便的方法是直接使用`CREATE TABLE B AS SELECT * FROM A;`一次性完成结构和数据的复制。还有一种高级方法是通过`SHOW CREATE TABLE A;`获取表A的创建语句,手动调整后创建表B,如有需要再用`INSERT INTO ... SELECT`复制数据。注意权限问题、跨数据库复制时需指定数据库名,以及大表复制时可能影响性能。
|
7天前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
25 1
|
1天前
|
关系型数据库 MySQL Windows
MySQL数据导入:MySQL 导入 Excel 文件.md
MySQL数据导入:MySQL 导入 Excel 文件.md
|
1天前
|
监控 Cloud Native 关系型数据库
【跨区域PolarDB-MySQL主备互通】:揭秘如何跨越万里实现数据无缝同步,打造坚不可摧的灾备体系!
【8月更文挑战第20天】阿里云PolarDB是一款兼容MySQL协议的云原生数据库服务,提供高性能与高可用性。本文介绍如何在PolarDB-MySQL中实现跨区域主备同步。首先创建主备两个集群,接着通过MySQL复制功能配置同步:获取主节点复制信息、配置备节点复制并启动复制进程。最后,通过`SHOW SLAVE STATUS\G;`监控复制状态,确保数据同步正常。此方法可提升数据的可靠性和可用性,需考虑网络条件对性能的影响。
|
2天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
34 0