【最佳实践】如何使用 Elasticsearch ingest 节点来丰富日志和指标

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 丰富化是将权威来源的数据合并到文档中的过程,当将这些数据导入到 Elasticsearch 中时,并用其他信息丰富文档,通常可以帮助我们更好的对信息进行搜索或查看数据。

从历史上看,数据丰富功能仅在 Logstash 中可用,但由于 Elasticsearch 7.5.0 中引入了 enrich 处理器,因此可以直接在 Elasticsearch 中进行丰富,而无需配置单独的服务/系统。如果你想知道在 Logstash 中是如何实现,那么请参阅我之前的文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据”。

由于通常用于丰富的主数据通常是在 CSV 文件中创建的,因此在此博客中,我们将逐步说明如何使用 CSV 文件中的数据将在摄取节点上运行的 enrich 处理器用于丰富数据。

样本 CSV 数据

我们可以使用阿里云Elasticsearch中的 Kibana ,或通过 ECS 自建 ELK,导入以下 CSV 格式的示例主数据,然后在将文档吸收到 Elasticsearch 中时用于丰富文档。 对于本博客中给出的示例,我们将主数据存储在一个名为 test.csv 的文件中。 此数据代表组织清单中的设备。

test.csv

"Device ID","Device Location","Device Owner","Device Type"
"device1","London","Engineering","Computer"
"device2","Toronto","Consulting","Mouse"
"device3","Winnipeg","Sales","Computer"
"device4","Barcelona","Engineering","Phone"
"device5","Toronto","Consulting","Computer"
"device6","London","Consulting","Computer"

请注意,CSV 数据不应包含任何其他空格,因为当前版本的 Data Visualizer 需要精确格式化数据。 在此 github 问题 中对此进行了记录。

将 CSV 数据导入 Elasticsearch

我们可以直接使用 Kibana 来把数据导入。打开 Kibana:

image.png

点击上面的 Import a CSV, NDJSON, or log file 链接:、

image.png

点击 Select or drag and drop a file, 然后选择刚才我们创建的 test.csv 文件:

image.png


点击 Import 按钮:

image.png

我们把导入的这个 index 的名字叫做 master_data_from_csv。点击 Import 按钮:

image.png

这样就完成了我们的 master_data_from_csv 的索引创建。我们可以在上面屏幕的下方的四个选项中任意选择一个来查看导入的数据。

利用我们的主数据丰富文档

在本节中,我们演示如何使用 enrich Processor 将主数据合并到输入数据流中的文档中。关于 enrich processor,我之前有另外一篇文章有详述“Elasticsearch:enrich processor (7.5发行版新功能)”。

image.png

第一步是创建一个丰富的策略,该策略定义我们将使用哪个字段将主数据与输入数据流中的文档进行匹配。 下面提供了适用于我们的数据的示例策略:

PUT /_enrich/policy/enrich-devices-policy
{
  "match": {
    "indices": "master_data_from_csv",
    "match_field": "Device ID",
    "enrich_fields": [
      "Device Location",
      "Device Owner",
      "Device Type"
    ]
  }
}

运行上面的策略。然后,我们使用 execute enrich policy API 为该策略创建 enrich 索引:

PUT /_enrich/policy/enrich-devices-policy/_execute

接下来,我们创建一个使用我们的丰富策略的 ingest pipeline。

PUT /_ingest/pipeline/device_lookup
{
  "description": "Enrich device information",
  "processors": [
    {
      "enrich": {
        "policy_name": "enrich-devices-policy",
        "field": "device_id",
        "target_field": "my_enriched_data",
        "max_matches": "1"
      }
    }
  ]
}

我们插入一个文档,并让它使用上面定义的 ingest pipeline,如下所示:

PUT /device_index/_doc/1?pipeline=device_lookup
{
  "device_id": "device1",
  "other_field": "some value"
}

我们可以使用 GET API 查看导入的文档,如下所示:

GET device_index/_doc/1

{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "London",
      "Device Owner" : "Engineering",
      "Device ID" : "device1",
      "Device Type" : "Computer"
    },
    "device_id" : "device1",
    "other_field" : "some value"
  }
}

在上面,我们可以看出来在返回的文档信息中,有多一个叫做 my_enriched_data 的字段。它包含了 Device Location, Device Owner, Device ID 及 Device Type。这些信息都来自于我们之前导入的 test.csv 文档信息。enrich processor 通过关联 device_id 为 device1 而从 master_data_from_csv 索引中获得这些信息。也就是说我们的数据变得更多了,这也就是我们之前说的丰富了。

在索引设置中指定pipeline

在上面,我们通过导入时指定的 pipeline 来进行调用 enrich processor,但是在实际的应用场景中,我们更加倾向于把这个配置与 index 的设置中,而不是在请求的url中来指定特定的 pipeline。我们可以通过在 index 的配置中添加 index.default_pipeline 来完成。

PUT device_index/_settings
{
  "index.default_pipeline": "device_lookup"
}

现在,发送到 device_index 的所有文档都将通过 device_lookup 管道,而无需在 URL 中使用 pipeline=device_lookup。 我们可以使用下面的 PUT 命令来验证它是否正常工作。

PUT /device_index/_doc/2
{
  "device_id": "device2",
  "other_field": "some value"
}

执行以下命令以查看我们刚刚提取的文档:

GET device_index/_doc/2

那么你可以看到如下所示的文档:

{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "Toronto",
      "Device Owner" : "Consulting",
      "Device ID" : "device2",
      "Device Type" : "Mouse"
    },
    "device_id" : "device2",
    "other_field" : "some value"
  }
}

结论

通常需要在导入时丰富文档,以确保 Elasticsearch 中的文档包含搜索或查看它们所需的信息。 在此博客中,我们演示了在 ingest 节点上运行的 enrich 处理器如何使用 CSV 数据进行扩充,这对于在将主数据吸收到 Elasticsearch 中时将主数据合并到文档中非常有用。

声明:本文由原文作者“ Elastic 中国社区布道师——刘晓国”授权转载,对未经许可擅自使用者,保留追究其法律责任的权利。

image.png

阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费 X-pack服务(单节点价值$6000)

相关活动


更多折扣活动,请访问阿里云 Elasticsearch 官网

阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费


image.png

image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1月前
|
存储 运维 监控
超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
【10月更文挑战第8天】随着互联网应用和微服务架构的普及,系统产生的日志数据量日益增长。有效地收集、存储、检索和分析这些日志对于监控系统健康状态、快速定位问题以及优化性能至关重要。Elasticsearch 作为一种分布式的搜索和分析引擎,以其强大的全文检索能力和实时数据分析能力成为日志处理的理想选择。
105 6
|
2月前
|
机器学习/深度学习 存储 监控
Elasticsearch 在日志分析中的应用
【9月更文第2天】随着数字化转型的推进,日志数据的重要性日益凸显。日志不仅记录了系统的运行状态,还提供了宝贵的洞察,帮助企业改进产品质量、优化用户体验以及加强安全防护。Elasticsearch 作为一个分布式搜索和分析引擎,因其出色的性能和灵活性,成为了日志分析领域的首选工具之一。本文将探讨如何使用 Elasticsearch 作为日志分析平台的核心组件,并详细介绍 ELK(Elasticsearch, Logstash, Kibana)栈的搭建和配置流程。
266 4
|
3月前
|
存储 缓存 监控
|
13天前
|
存储 SQL 监控
|
1月前
|
存储 运维 监控
Elasticsearch Serverless 高性价比智能日志分析关键技术解读
本文解析了Elasticsearch Serverless在智能日志分析领域的关键技术、优势及应用价值。
Elasticsearch Serverless 高性价比智能日志分析关键技术解读
|
10天前
|
存储 监控 索引
Elasticsearch 节点
【11月更文挑战第3天】
22 3
|
13天前
|
自然语言处理 监控 数据可视化
|
13天前
|
运维 监控 安全
|
17天前
|
存储 监控 安全
|
16天前
|
存储 数据采集 监控
开源日志分析Elasticsearch
【10月更文挑战第22天】
44 5

相关产品

  • 检索分析服务 Elasticsearch版