[玩转ES] ES批量/全量导入数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: [玩转ES] ES批量/全量导入数据

方法 / 步骤

一:Logstash实现

1.1 安装插件

# 从Logstash的bin目录下安装输入输出ES和MySQL插件
./logstash-plugin install logstash-output-elasticsearch
./logstash-plugin install logstash-input-jdbc

在这里插入图片描述

将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下

1.2 配置

1.2.1 pipelines配置

  • logstash主目录下\config\pipelines.yml 放开下面的注释
 - pipeline.id: test
   pipeline.workers: 1
   pipeline.batch.size: 1
   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
   queue.type: persisted
   path.config: "/tmp/logstash/*.config"

1.2.2 JDBC 同步配置

1.2.2.1 全量更新

文件名称: init-mysql2es.conf

input {
    stdin {}
    jdbc{
      type => "classify"
      jdbc_default_timezone => "Asia/Shanghai"
      # mysql 数据库链接,school-edu为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.11.10:3306/ldd_saas?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "useradmin"
      # 驱动
      jdbc_driver_library => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql-connector-java-8.0.19.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      # 是否开启分页
      jdbc_paging_enabled => true
      #指定每页显示50000条
      jdbc_page_size => "50000"
      #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
      use_column_value => false
      #执行的sql 文件路径+名称
      #statement_filepath => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql/jdbc.sql"
      #执行的sql语句
      statement => "SELECT `member_id` AS id, `club_id`, `name`, `sex`, `mobile`, `avatar`, `birthday`, `face_img`, `credit_type`, `credit_no`, `wechat_no`, `nick_name`, `stature`, `weight` FROM member_basics"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      #schedule => "* * * * *"
    }
}
output {
    elasticsearch  {
      # ES的IP地址及端口
        hosts => ["localhost:9200"]
      # 索引名称
        index => "member_basics"
        #user => "elastic"
        #password => "changeme"
        #document_type => "_doc"
      # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
     # JSON格式输出
        codec => json_lines
    }
}
1.2.2.2 增量更新

文件名称: sync-mysql2es.conf

#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    # 是否开启分页
    jdbc_paging_enabled => true
     # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    # 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
     # 同步数据的查询sql语句
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
#logstash输出配置
output {
  # 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  # 指定输出到ES的具体索引
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

1.2.3 运行脚本

# 全量构建 run-init2es.bat
logstash -f init-mysql2es.conf
#增量构建 文件名称: run-sync2es.bat
logstash -f sync-mysql2es.conf
  • 导入成功

在这里插入图片描述

  • kibana 查看已经导入成功

在这里插入图片描述

参考资料 & 致谢

[1] mysql (全量)数据导入到 elasticsearch
[2] 通过Logstash实现mysql数据定时增量同步到ES

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
Linux 测试技术
非LVM磁盘分区根目录扩容操作参考
在操作系统部署时,根目录使用的是非LVM分区,后期空间不足又无法扩容。 提供一个思路是新增一块硬盘,创建为逻辑卷,把根目录迁移过去,然后引导以新分区启动。 迁移完成后,旧的根目录可以再格式化为逻辑卷,合并到新磁盘分区。 本操作仅在CentOS Linux release 7.9.2009实验过,其他环境可能不支持。
2425 0
|
4月前
|
传感器 数据采集 消息中间件
怎么处理多源异构数据?搞不清楚就别谈数据融合!
在数据分析中,处理多源异构数据是关键挑战。本文详解其定义、常见问题及融合策略,结合实际场景提供全流程解决方案,助你高效实现数据价值。
|
NoSQL Linux MongoDB
mongodb如何快速进行版本升级
因为3.2内存设置的一些问题,需要升级到3.4,网上查了一下mongo不能跨版本升级,所以只能3.2先升级到3.4,再往上升级。 目前3.4足够我使用,所以升级到3.4就可以了。但是网上升级都比较复杂,因为都是大系统,而我只是一个小系统,单一的服务和数据库。所以网上的文章基本没有,我参考了一些和官网的一些,总结了简单的升级过程。 文章中的系统环境是CentOS
742 0
|
SQL 安全 Java
JavaSecLab 一款综合Java漏洞平台
JavaSecLab是一款综合型Java漏洞学习平台,涵盖多种漏洞场景,提供漏洞代码、修复示例、安全编码规范及友好UI。适用于安全服务、甲方安全培训、安全研究等领域,助于理解漏洞原理与修复方法。支持跨站脚本、SQL注入等多种漏洞类型……
367 2
|
存储 域名解析 运维
阿里云轻量应用服务器82元与298元年解析与选择参考
目前,阿里云推出的两款特惠轻量应用服务器——82元1年的2核2G3M套餐和298元1年的2核4G4M套餐,吸引了众多用户的关注。本文将深入解析这两款套餐的具体配置、优势、应用场景及选购建议,以供参考。
阿里云轻量应用服务器82元与298元年解析与选择参考
|
搜索推荐 关系型数据库 MySQL
MySQL中的模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其在处理搜索功能时。虽然Elasticsearch(ES)等搜索引擎在处理文本搜索方面表现出色,但在一些场景下,直接使用MySQL数据库实现模糊匹配也是一个经济且高效的选择。本文将分享如何在不引入ES的情况下,利用MySQL实现模糊匹配的五大步骤和十个实战案例。
898 1
|
存储 Kubernetes 调度
Kubernetes 中存储使用介绍(PV、PVC和StorageClass)
在 Kubernetes 中的应用,都是以 Pod 的形式运行的,当我们要是在 Kubernetes 上运行一些需要存放数据的应用时,便需要关注应用存放的数据是否安全可靠。因为 Pod 是有生命周期的,那么也就是说当 Pod 被删除或重启后,Pod 里面所运行的数据也会随之消失。
2748 0
Kubernetes 中存储使用介绍(PV、PVC和StorageClass)
|
机器学习/深度学习 人工智能 监控
AI安防监控
AI安防监控运用人工智能技术分析视频监控,实现对象识别、追踪和预警,广泛应用在安防、交通和工业等领域。它提升了监控的实时性和准确性,降低了人力成本,但面临误判、隐私泄露和高成本等问题。随着市场需求增长,全球安防监控行业将迎来持续发展,需在提升技术的同时保障个人隐私。
675 0
|
人工智能 Linux API
【AI大模型应用开发】【AutoGPT系列】1. 快速上手 - 运行原生AutoGPT or 利用AutoGPT框架开发自己的Agent
【AI大模型应用开发】【AutoGPT系列】1. 快速上手 - 运行原生AutoGPT or 利用AutoGPT框架开发自己的Agent
638 0
|
索引
Elasticsearch update_by_query 语句使用记录
Elasticsearch update_by_query 语句使用记录
671 0