1、问题来源
看到 Elasticsearch 数据导出需求,我的第一反应是,好好的为啥要导出?
写入的时候直接写给定格式的文件如 CSV 不就可以了。
其实真实的业务场景,远非我想的这么简单。
Elasticsearch 作为存储库和检索源,相关的输入数据来源早已包罗万象、几乎“无所不能”。
如下图所示:
关系型数据库(MySQL、Oracle、PostgreSQL)、非关系型数据库(MongoDB)、大数据引擎(Kafka、Spark、Hadoop、Hbase、Flink)、内存数据库(Redis)都可以导入 Elasticsearch。
原始数据经过采集到写入 Elasticsearch 之前往往经过预处理、ETL(抽取、转换、加载),核心检索相关的数据落地存储到 Elasticsearch。
某些特定的业务场景(比如:银行业务)需要导出 Elasticsearch 数据,实际是需要导出已经预处理过、已经清洗过的 Elasticsearch 数据。
那么,问题来了?如何导出呢?
2、Elasticsearch 导出数据的方式
以 CSV 格式(导出数据格式)数据为例。
Elasticsearch 导出数据的方式有很多种,包含但不限于:
- logstash_output_csv
- 类似 es2csv python 开源工具包导出
- kibana 可视化导出
- python、java或shell脚本等自己实现
我们逐个以 Elasticsearch 8.X 版本演示一下。
3、logstash_output_csv 导出
input { elasticsearch { hosts => "172.121.10.114:9200" index => "tianyancha_index" query => ' { "query": { "match_all": {} } } ' ssl => "true" user => "elastic" password => "changeme" ca_file => "/www/...省略.../certs/http_ca.crt" } } output { csv { # elastic field name fields => ["regist_id", "establishment_time", "enttype", "company_name", "company_type"] # This is path where we store output. path => "/www/...省略.../sync/tyc_export.csv" } }
结果如下:
生成 CSV 文件如下:
常见报错信息:
[main] Pipeline error {:pipeline_id=>"main", :exception=>#<Manticore::ClientProtocolException: 172.21.0.14:9200 failed to respond>,
解决方案:开启 ssl,默认为false。8.X 必须得手动开启。
4、elasticsearch_tocsv 开源工具包导出
- 工具名称:elasticsearch_tocsv
- 工具地址:https://pypi.org/project/elasticsearch-tocsv/
- 工具安装方式:
pip3 install elasticsearch-tocsv
- 工具依赖:python 3.8(含)以上版本。
- 工具实战:
elasticsearch_tocsv -p 9200 -ho 172.121.10.114 -u elastic -pw changeme -s True -cp '../config/certs/http_ca.crt' -i tianyancha_index -f "@regist_id,establishment_time,scope_business,address,registration_number"
参数含义:
- -ho:Elasticsearch IP 地址
- -p: Http 端口号
- -u:用户名
- -pw:密码
- -cp:CRT证书地址
- -s:SSL 认证,默认为false,8.X 需要开启
- -i:索引
- -f:导出的字段
工具导出实现截图:
类似工具很多,拿一个举例,方便大家实操。
5、借助kibana 导出
1 分钟视频就可以搞定。
视频如下,一看就会。
6、自己写代码导出
6.1 Python 程序导出
简单的 Python 程序实现如下。
def client_init(): ssl_context = create_ssl_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE es = Elasticsearch( hosts=[ "https://172.121.10.114:9200" ], ssl_context=ssl_context, http_auth=('elastic', 'changeme'), use_ssl=True, verify_certs=True, ) return es def tianyancha_search(): client =client_init() s = Search(using=client, index="tianyancha_index") \ .query("match_all") response = s.execute() sample = response['hits']['hits'] with open( 'tianyancha_rst.csv', 'w', newline='' ) as csvfile: spamwriter = csv.writer( csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL ) spamwriter.writerow( ['regist_id_new', 'company_name', 'business_starttime', 'scope_business'] ) for hit in sample: # fill columns 1, 2, 3 with your data col1 = hit._source.regist_id_new col2 = hit._source.company_name col3 = hit._source.business_starttime col4 = hit._source.scope_business spamwriter.writerow( [col1, col2, col3, col4] )
不复杂三段论:
- 1)连接 8.X Elasticsearch 集群;
- 2)遍历索引获取数据
- 3)解析数据写入 CSV 文件。
这里只是简单的 from + size 遍历,数据量大可以改成 scroll 实现。
导出 CSV 结果如下:
6.2 Shell 脚本导出
curl -s -XGET -H "Content-Type:application/json" --cacert ../config/certs/http_ca.crt -u elastic:changeme 'https://172.121.10.114:9200/tianyancha_index/_search' -d ' {"from": 0, "size": 2, "query": { "match_all": {} } }' | jq -r '["regist_id", "establishment_time", "scope_business", "address", "registration_number"],(.hits.hits[] | [._source.regist_id // "", ._source.establishment_time // "", ._source.scope_business // "", ._source.address // "", ._source.registration_number // ""]) | @csv' > tyc_es2csv.csv
解释一下:
jq 是 shell 脚本下的 json 解析工具。
["regist_id", ****, "registration_number"]代表以数组形式自定义输出多项。
jq 使用细节可以查看帮助手册:https://stedolan.github.io/jq/tutorial/
shell 脚本导出 CSV 如下:
7、小结
能导出 Elasticsearch 方案有 N 多种,本文仅是抛砖引玉。
导出方案如何选型?
- 根据业务需求,如果不想写代码可以借助第三方工具实现。
- 如果想使用 ELK 组件,推荐使用 logstash。
- 如果仅自己有针对的实现,可以 Python 脚本、Shell 脚本都可以。
更多方案,欢迎留言交流。
更短时间更快习得更多干货!
中国50%+Elastic认证专家出自于此!
在不确定的时代,寻求确定性!
比同事抢先一步学习进阶干货!