从历史上看,数据丰富功能仅在 Logstash 中可用,但由于 Elasticsearch 7.5.0 中引入了 enrich 处理器,因此可以直接在 Elasticsearch 中进行丰富,而无需配置单独的服务/系统。如果你想知道在 Logstash 中是如何实现,那么请参阅我之前的文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据”。
由于通常用于丰富的主数据通常是在 CSV 文件中创建的,因此在此博客中,我们将逐步说明如何使用 CSV 文件中的数据将在摄取节点上运行的 enrich 处理器用于丰富数据。
样本 CSV 数据
我们可以使用阿里云Elasticsearch中的 Kibana ,或通过 ECS 自建 ELK,导入以下 CSV 格式的示例主数据,然后在将文档吸收到 Elasticsearch 中时用于丰富文档。 对于本博客中给出的示例,我们将主数据存储在一个名为 test.csv 的文件中。 此数据代表组织清单中的设备。
test.csv
"Device ID","Device Location","Device Owner","Device Type"
"device1","London","Engineering","Computer"
"device2","Toronto","Consulting","Mouse"
"device3","Winnipeg","Sales","Computer"
"device4","Barcelona","Engineering","Phone"
"device5","Toronto","Consulting","Computer"
"device6","London","Consulting","Computer"
请注意,CSV 数据不应包含任何其他空格,因为当前版本的 Data Visualizer 需要精确格式化数据。 在此 github 问题 中对此进行了记录。
将 CSV 数据导入 Elasticsearch
我们可以直接使用 Kibana 来把数据导入。打开 Kibana:
点击上面的 Import a CSV, NDJSON, or log file 链接:、
点击 Select or drag and drop a file, 然后选择刚才我们创建的 test.csv 文件:
点击 Import 按钮:
我们把导入的这个 index 的名字叫做 master_data_from_csv。点击 Import 按钮:
这样就完成了我们的 master_data_from_csv 的索引创建。我们可以在上面屏幕的下方的四个选项中任意选择一个来查看导入的数据。
利用我们的主数据丰富文档
在本节中,我们演示如何使用 enrich Processor 将主数据合并到输入数据流中的文档中。关于 enrich processor,我之前有另外一篇文章有详述“Elasticsearch:enrich processor (7.5发行版新功能)”。
第一步是创建一个丰富的策略,该策略定义我们将使用哪个字段将主数据与输入数据流中的文档进行匹配。 下面提供了适用于我们的数据的示例策略:
PUT /_enrich/policy/enrich-devices-policy
{
"match": {
"indices": "master_data_from_csv",
"match_field": "Device ID",
"enrich_fields": [
"Device Location",
"Device Owner",
"Device Type"
]
}
}
运行上面的策略。然后,我们使用 execute enrich policy API 为该策略创建 enrich 索引:
PUT /_enrich/policy/enrich-devices-policy/_execute
接下来,我们创建一个使用我们的丰富策略的 ingest pipeline。
PUT /_ingest/pipeline/device_lookup
{
"description": "Enrich device information",
"processors": [
{
"enrich": {
"policy_name": "enrich-devices-policy",
"field": "device_id",
"target_field": "my_enriched_data",
"max_matches": "1"
}
}
]
}
我们插入一个文档,并让它使用上面定义的 ingest pipeline,如下所示:
PUT /device_index/_doc/1?pipeline=device_lookup
{
"device_id": "device1",
"other_field": "some value"
}
我们可以使用 GET API 查看导入的文档,如下所示:
GET device_index/_doc/1
{
"_index" : "device_index",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"my_enriched_data" : {
"Device Location" : "London",
"Device Owner" : "Engineering",
"Device ID" : "device1",
"Device Type" : "Computer"
},
"device_id" : "device1",
"other_field" : "some value"
}
}
在上面,我们可以看出来在返回的文档信息中,有多一个叫做 my_enriched_data 的字段。它包含了 Device Location, Device Owner, Device ID 及 Device Type。这些信息都来自于我们之前导入的 test.csv 文档信息。enrich processor 通过关联 device_id 为 device1 而从 master_data_from_csv 索引中获得这些信息。也就是说我们的数据变得更多了,这也就是我们之前说的丰富了。
在索引设置中指定pipeline
在上面,我们通过导入时指定的 pipeline 来进行调用 enrich processor,但是在实际的应用场景中,我们更加倾向于把这个配置与 index 的设置中,而不是在请求的url中来指定特定的 pipeline。我们可以通过在 index 的配置中添加 index.default_pipeline 来完成。
PUT device_index/_settings
{
"index.default_pipeline": "device_lookup"
}
现在,发送到 device_index 的所有文档都将通过 device_lookup 管道,而无需在 URL 中使用 pipeline=device_lookup。 我们可以使用下面的 PUT 命令来验证它是否正常工作。
PUT /device_index/_doc/2
{
"device_id": "device2",
"other_field": "some value"
}
执行以下命令以查看我们刚刚提取的文档:
GET device_index/_doc/2
那么你可以看到如下所示的文档:
{
"_index" : "device_index",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"my_enriched_data" : {
"Device Location" : "Toronto",
"Device Owner" : "Consulting",
"Device ID" : "device2",
"Device Type" : "Mouse"
},
"device_id" : "device2",
"other_field" : "some value"
}
}
结论
通常需要在导入时丰富文档,以确保 Elasticsearch 中的文档包含搜索或查看它们所需的信息。 在此博客中,我们演示了在 ingest 节点上运行的 enrich 处理器如何使用 CSV 数据进行扩充,这对于在将主数据吸收到 Elasticsearch 中时将主数据合并到文档中非常有用。
声明:本文由原文作者“ Elastic 中国社区布道师——刘晓国”授权转载,对未经许可擅自使用者,保留追究其法律责任的权利。
【阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费 X-pack服务(单节点价值$6000)
相关活动
更多折扣活动,请访问阿里云 Elasticsearch 官网
阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费