Elasticsearch 8.X 导出 CSV 多种方案,一网打尽!

简介: Elasticsearch 8.X 导出 CSV 多种方案,一网打尽!

1、问题来源

看到 Elasticsearch 数据导出需求,我的第一反应是,好好的为啥要导出?

写入的时候直接写给定格式的文件如 CSV 不就可以了。

其实真实的业务场景,远非我想的这么简单。

Elasticsearch 作为存储库和检索源,相关的输入数据来源早已包罗万象、几乎“无所不能”。

如下图所示:

关系型数据库(MySQL、Oracle、PostgreSQL)、非关系型数据库(MongoDB)、大数据引擎(Kafka、Spark、Hadoop、Hbase、Flink)、内存数据库(Redis)都可以导入 Elasticsearch。

原始数据经过采集到写入 Elasticsearch 之前往往经过预处理、ETL(抽取、转换、加载),核心检索相关的数据落地存储到 Elasticsearch。

某些特定的业务场景(比如:银行业务)需要导出 Elasticsearch 数据,实际是需要导出已经预处理过、已经清洗过的 Elasticsearch 数据。

那么,问题来了?如何导出呢?

2、Elasticsearch 导出数据的方式

以 CSV 格式(导出数据格式)数据为例。

Elasticsearch 导出数据的方式有很多种,包含但不限于:

  • logstash_output_csv
  • 类似 es2csv python 开源工具包导出
  • kibana 可视化导出
  • python、java或shell脚本等自己实现

我们逐个以 Elasticsearch 8.X 版本演示一下。

3、logstash_output_csv 导出

input {
 elasticsearch {
    hosts => "172.121.10.114:9200"
    index => "tianyancha_index"
    query => '
    {
    "query": {
    "match_all": {}
    }
    } 
  '
   ssl => "true"
   user => "elastic"
   password => "changeme"
   ca_file => "/www/...省略.../certs/http_ca.crt"
  }
}
 
output {
  csv {
    # elastic field name
    fields => ["regist_id", "establishment_time", "enttype", "company_name", "company_type"]
    # This is path where we store output.   
    path => "/www/...省略.../sync/tyc_export.csv"
  }
}

结果如下:

生成 CSV 文件如下:

常见报错信息:

[main] Pipeline error {:pipeline_id=>"main", :exception=>#<Manticore::ClientProtocolException: 172.21.0.14:9200 failed to respond>,

解决方案:开启 ssl,默认为false。8.X 必须得手动开启。

4、elasticsearch_tocsv 开源工具包导出

pip3 install elasticsearch-tocsv
  • 工具依赖:python 3.8(含)以上版本。
  • 工具实战:
elasticsearch_tocsv -p 9200 -ho 172.121.10.114 -u elastic -pw changeme -s True -cp '../config/certs/http_ca.crt' -i tianyancha_index -f "@regist_id,establishment_time,scope_business,address,registration_number"

参数含义:

  • -ho:Elasticsearch IP 地址
  • -p: Http 端口号
  • -u:用户名
  • -pw:密码
  • -cp:CRT证书地址
  • -s:SSL 认证,默认为false,8.X 需要开启
  • -i:索引
  • -f:导出的字段

工具导出实现截图:

类似工具很多,拿一个举例,方便大家实操。

5、借助kibana 导出

1 分钟视频就可以搞定。

视频如下,一看就会。

6、自己写代码导出

6.1 Python 程序导出

简单的 Python 程序实现如下。

def client_init():
    ssl_context = create_ssl_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
 
    es = Elasticsearch(
        hosts=[
            "https://172.121.10.114:9200"
        ],
        ssl_context=ssl_context,
        http_auth=('elastic', 'changeme'),
        use_ssl=True,
        verify_certs=True,
    )
    return es
 
def tianyancha_search():
    client =client_init()
    s = Search(using=client, index="tianyancha_index") \
        .query("match_all")
    response = s.execute()
    sample = response['hits']['hits']
    with open( 'tianyancha_rst.csv', 'w', newline='' ) as csvfile:
        spamwriter = csv.writer( csvfile, delimiter=',',
                                 quotechar='|', quoting=csv.QUOTE_MINIMAL )
 
        spamwriter.writerow( ['regist_id_new', 'company_name', 'business_starttime', 'scope_business'] )
        for hit in sample:
            # fill columns 1, 2, 3 with your data
            col1 = hit._source.regist_id_new
            col2 = hit._source.company_name
            col3 = hit._source.business_starttime
            col4 = hit._source.scope_business
            spamwriter.writerow( [col1, col2, col3, col4] )

不复杂三段论:

  • 1)连接 8.X Elasticsearch 集群;
  • 2)遍历索引获取数据
  • 3)解析数据写入 CSV 文件。

这里只是简单的 from + size 遍历,数据量大可以改成 scroll 实现。

导出 CSV 结果如下:

6.2 Shell 脚本导出

curl -s -XGET -H "Content-Type:application/json" --cacert ../config/certs/http_ca.crt -u elastic:changeme   'https://172.121.10.114:9200/tianyancha_index/_search' -d '
    {"from": 0,
    "size": 2,
    "query": {
        "match_all": {}
    }
  }' | jq -r '["regist_id", "establishment_time", "scope_business", "address", "registration_number"],(.hits.hits[] | 
  [._source.regist_id // "", ._source.establishment_time // "", ._source.scope_business // "", ._source.address // "", ._source.registration_number // ""]) | @csv' > tyc_es2csv.csv

解释一下:

jq 是 shell 脚本下的 json 解析工具。

["regist_id", ****, "registration_number"]代表以数组形式自定义输出多项。

jq 使用细节可以查看帮助手册:https://stedolan.github.io/jq/tutorial/

shell 脚本导出 CSV 如下:

7、小结

能导出 Elasticsearch 方案有 N 多种,本文仅是抛砖引玉。

导出方案如何选型?

  • 根据业务需求,如果不想写代码可以借助第三方工具实现。
  • 如果想使用 ELK 组件,推荐使用 logstash。
  • 如果仅自己有针对的实现,可以 Python 脚本、Shell 脚本都可以。

更多方案,欢迎留言交流。


更短时间更快习得更多干货!

中国50%+Elastic认证专家出自于此!

在不确定的时代,寻求确定性

比同事抢先一步学习进阶干货!


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
相关文章
|
存储 JSON 数据格式
Elasticsearch 8.X 可以按照数组下标取数据吗?
Elasticsearch 8.X 可以按照数组下标取数据吗?
|
12月前
|
存储 监控 数据可视化
可观测性方案怎么选?SelectDB vs Elasticsearch vs ClickHouse
基于 SelectDB 的高性能倒排索引、高吞吐量写入和高压缩存储,用户可以构建出性能高于Elasticsearch 10 倍的可观测性平台,并支持国内外多个云上便捷使用 SelectDB Cloud 的开箱即用服务。
581 8
可观测性方案怎么选?SelectDB vs Elasticsearch vs ClickHouse
|
存储 SQL Apache
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
本文将从技术选型的视角,从开放性、系统架构、实时写入、实时存储、实时查询等多方面,深入分析 Apache Doris 与 Elasticsearch 的能力差异及性能表现
1625 17
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
|
JSON 监控 Java
Elasticsearch 8.X reindex 源码剖析及提速指南
Elasticsearch 8.X reindex 源码剖析及提速指南
|
数据采集 人工智能 安全
阿里云Elasticsearch 企业级AI搜索方案发布
本文从AI搜索落地的挑战、阿里云在RAG场景的实践、效果提升三个方面,深度解读阿里云Elasticsearch 企业级AI搜索方案。
1507 8
|
存储 数据处理 索引
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程
|
存储 监控 安全
Elasticsearch 8.X 集群 SSL 证书到期了,怎么更换?
Elasticsearch 8.X 集群 SSL 证书到期了,怎么更换?
|
运维 架构师 搜索推荐
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
|
存储 机器学习/深度学习 搜索推荐
Elasticsearch 8.X 向量检索和普通检索能否实现组合检索?如何实现?
Elasticsearch 8.X 向量检索和普通检索能否实现组合检索?如何实现?
|
API 索引
近期,几个典型 Elasticsearch 8.X 问题及方案探讨
近期,几个典型 Elasticsearch 8.X 问题及方案探讨