[玩转ES] ES批量/全量导入数据

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: [玩转ES] ES批量/全量导入数据

方法 / 步骤

一:Logstash实现

1.1 安装插件

# 从Logstash的bin目录下安装输入输出ES和MySQL插件
./logstash-plugin install logstash-output-elasticsearch
./logstash-plugin install logstash-input-jdbc

在这里插入图片描述

将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下

1.2 配置

1.2.1 pipelines配置

  • logstash主目录下\config\pipelines.yml 放开下面的注释
 - pipeline.id: test
   pipeline.workers: 1
   pipeline.batch.size: 1
   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
   queue.type: persisted
   path.config: "/tmp/logstash/*.config"

1.2.2 JDBC 同步配置

1.2.2.1 全量更新

文件名称: init-mysql2es.conf

input {
    stdin {}
    jdbc{
      type => "classify"
      jdbc_default_timezone => "Asia/Shanghai"
      # mysql 数据库链接,school-edu为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.11.10:3306/ldd_saas?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "useradmin"
      # 驱动
      jdbc_driver_library => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql-connector-java-8.0.19.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      # 是否开启分页
      jdbc_paging_enabled => true
      #指定每页显示50000条
      jdbc_page_size => "50000"
      #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
      use_column_value => false
      #执行的sql 文件路径+名称
      #statement_filepath => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql/jdbc.sql"
      #执行的sql语句
      statement => "SELECT `member_id` AS id, `club_id`, `name`, `sex`, `mobile`, `avatar`, `birthday`, `face_img`, `credit_type`, `credit_no`, `wechat_no`, `nick_name`, `stature`, `weight` FROM member_basics"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      #schedule => "* * * * *"
    }
}
output {
    elasticsearch  {
      # ES的IP地址及端口
        hosts => ["localhost:9200"]
      # 索引名称
        index => "member_basics"
        #user => "elastic"
        #password => "changeme"
        #document_type => "_doc"
      # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
     # JSON格式输出
        codec => json_lines
    }
}
1.2.2.2 增量更新

文件名称: sync-mysql2es.conf

#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    # 是否开启分页
    jdbc_paging_enabled => true
     # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    # 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
     # 同步数据的查询sql语句
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
#logstash输出配置
output {
  # 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  # 指定输出到ES的具体索引
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

1.2.3 运行脚本

# 全量构建 run-init2es.bat
logstash -f init-mysql2es.conf
#增量构建 文件名称: run-sync2es.bat
logstash -f sync-mysql2es.conf
  • 导入成功

在这里插入图片描述

  • kibana 查看已经导入成功

在这里插入图片描述

参考资料 & 致谢

[1] mysql (全量)数据导入到 elasticsearch
[2] 通过Logstash实现mysql数据定时增量同步到ES

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
数据可视化 JavaScript 前端开发
API工具--Apifox和Postman对比(区别)
Postman和Apifox有什么区别?他们之间分别有什么优势,感兴趣的同学可以继续往下看。 不吹不黑,只列功能,纯客观比对。
API工具--Apifox和Postman对比(区别)
|
2月前
|
人工智能 自然语言处理 前端开发
SpringAI+DeepSeek大模型应用开发
SpringAI整合主流大模型,支持对话、函数调用与RAG,提供统一API,简化开发。涵盖多模态、流式传输、会话记忆等功能,助力快速构建AI应用。
|
存储 Kubernetes 调度
Kubernetes 中存储使用介绍(PV、PVC和StorageClass)
在 Kubernetes 中的应用,都是以 Pod 的形式运行的,当我们要是在 Kubernetes 上运行一些需要存放数据的应用时,便需要关注应用存放的数据是否安全可靠。因为 Pod 是有生命周期的,那么也就是说当 Pod 被删除或重启后,Pod 里面所运行的数据也会随之消失。
2934 0
Kubernetes 中存储使用介绍(PV、PVC和StorageClass)
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
390 4
|
存储 域名解析 运维
阿里云轻量应用服务器82元与298元年解析与选择参考
目前,阿里云推出的两款特惠轻量应用服务器——82元1年的2核2G3M套餐和298元1年的2核4G4M套餐,吸引了众多用户的关注。本文将深入解析这两款套餐的具体配置、优势、应用场景及选购建议,以供参考。
阿里云轻量应用服务器82元与298元年解析与选择参考
|
搜索推荐 关系型数据库 MySQL
MySQL中的模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其在处理搜索功能时。虽然Elasticsearch(ES)等搜索引擎在处理文本搜索方面表现出色,但在一些场景下,直接使用MySQL数据库实现模糊匹配也是一个经济且高效的选择。本文将分享如何在不引入ES的情况下,利用MySQL实现模糊匹配的五大步骤和十个实战案例。
1091 1
|
关系型数据库 MySQL Java
MySQL数据锁:Record Lock,Gap Lock 和 Next-Key Lock
本文基于 MySQL 8.0.30 版本及 InnoDB 引擎,深入解析三种行锁机制:记录锁(Record Lock)、间隙锁(Gap Lock)和临键锁(Next-key Lock)。记录锁锁定索引记录,确保事务唯一修改;间隙锁锁定索引间的间隙,防止新记录插入;临键锁结合两者,锁定范围并记录自身,有效避免幻读现象。通过具体示例展示了不同锁的作用机制及其在并发控制中的应用。
1367 2
|
缓存 边缘计算 应用服务中间件
一篇文章让你搞懂到底什么是 CDN
一篇文章让你搞懂到底什么是 CDN
1779 1
|
运维 Kubernetes Linux
10分钟搭建Kubernetes容器集群平台(kubeadm)
10分钟搭建Kubernetes容器集群平台(kubeadm)
vcpkg安装软件包时自定义编译选项
vcpkg安装软件包时自定义编译选项
1185 0