Elasticsearch enrich processor

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: Elasticsearch enrich processor

enrich processor 简介

ingest pipeline 可以在传入的文档被索引之前,对文档进行预处理,通过 processor 中定义的一系列规则来修改文档的内容(例如大小写转换等)。

在 Elasticsearch 7.5 版本引入了 enrich processor,可以将现有索引(source index)中的数据添加到传入的文档(incoming document)中。

比如,你可以在如下的场景中用到:

  • 根据已知的 IP 地址识别 Web 服务或供应商。
  • 根据产品 ID 将产品信息添加到零售订单中。
  • 根据电子邮件地址补充联系信息。
  • 根据用户坐标添加邮政编码。

使用 enrich processor

使用 enrich processor 有如下几个步骤:

  • 1.添加 enrich data:添加 document (enrich data)到一个或者多个的 source index 中,这些 document 中应包含之后要添加到 incoming documents 中的数据。
  • 2.创建 enrich policy:enrich policy 中应至少包含如下参数:

    • 指定source index的。
    • 指定 incoming documents 和 source index 用于匹配的属性。
    • 指定要添加到 incoming documents 中的属性。
  • 3.执行 enrich policy:执行完后会自动创建相应的 enrich index, enrich index 和普通索引不同,进行了优化。
  • 4.在 ingest pipeline 使用 enrich processor:enrich processor 使用 enrich index 来查询。

背景说明

source index 的内容如下:

loc num company
广东省 A1001 腾讯
上海市 B1001 Bilibili
浙江省 C1001 阿里巴巴

incoming document 传入的文档如下,通过 num 字段查到对应 source index 中的 loc 的值,添加到 incoming document 中新增 enrich_loc 属性中。

num company
A1001 腾讯
B1001 Bilibili
C1001 阿里巴巴

第一步:添加 enrich data

通过 _bulk API 批量添加文档到 location 索引,这些文档和普通的文档一样。

POST _bulk
{"index": {"_index":"location"}}
{"loc":"广东省","company":"腾讯","num":"A1001"}
{"index": {"_index":"location"}}
{"loc":"上海市","company":"Bilibili","num":"B1001"}
{"index": {"_index":"location"}}
{"loc":"浙江省","company":"阿里巴巴","num":"C1001"}

第二步:创建 enrich policy

enrich policy 一旦创建,就不能更新或者修改。

PUT /_enrich/policy/my-policy
{
  "match": {
    "indices": "location",  #source index 索引名,就是前面创建的 enrich data 对应的索引
    "match_field": "num", #source index 中的属性名,用于incoming documents 和 source index 匹配的属性,属性名一样都是 num
    "enrich_fields": ["loc"], #添加到 incoming documents 中的属性
    # 可选,过滤 source index 的文档,只有 loc.keyword 是上海市的 enrich data 才能将属性添加到 incoming documents 中
    "query": {
      "match": {
        "loc.keyword": "上海市"
      }
    }
  }
}

第三步:执行 enrich policy

当创建了 enrich policy 后,你可以通过 execute enrich policy API 去执行 enrich policy。当执行 enrich policy 后,会自动创建 enrich index。

直接将 incoming document 与 source index 中的文档匹配可能会很慢且占用大量资源。 为了加快处理速度,enrich processor 使用了 enrich index。
enrich index 包含来自 source index 的 enrich data,enrich index 具有一些特殊属性可帮助简化它们:

  • 它们是系统索引,这意味着它们由 Elasticsearch 在内部进行管理,仅适用于 enrich processor。
  • 它们始终以 .enrich- * 开头。
  • 它们是只读的,这意味着你不能直接更改它们。
  • 它们被强制合并以便快速检索。

当 source index 中新增或者修改了数据,只需要重新执行 enrich policy 就可以更改 enrich index,从而更新 enrich processor。

通过以下命令执行 enrich policy:

PUT /_enrich/policy/my-policy/_execute

查看自动创建的 enrich index:

GET _cat/indices/.enrich*

# 返回结果
green open .enrich-my-policy-1616136526661 Vxal9lLBSlKS5lmzMpFfwQ 1 3 1 0 13.4kb 3.3kb

我感觉 enrich policy 这里有个小 bug,当删除 enrich policy 时,例如删除的 enrich policy 为 my-policy-1,会同时删除 my-policy-1 的 enrich index 和 enrich policy ,但是如果原先还有个 my-policy-2(两个 enrich policy 在-之前是一样的),会把 my-policy-2 的 enrich index 也误删了(enrich policy 不删)。

第四步:在 ingest pipeline 使用 enrich processor

PUT _ingest/pipeline/loc-pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "my-policy", #引用前面创建的 enrich policy
        "field": "num",  # incoming document 中的属性名,用于和 source index 中的属性匹配值
        #在incoming document 中新增的属性,
        #包含在 enrich policy 中定义的 match_field 和 enrich_fields 的值
        "target_field": "enrich_loc" 
      }
    }
  ]
}

验证

使用 simulate 用来调试 ingest pipeline的效果,由于 source index 中匹配到的 loc.keyword 不是上海市,不会对这个文档进行处理:

POST _ingest/pipeline/loc-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "num": "A1001",
        "company": "腾讯"
      }
    }
  ]
}

# 返回结果
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "company" : "腾讯",
          "num" : "A1001"
        },
        "_ingest" : {
          "timestamp" : "2021-03-19T06:56:45.754486259Z"
        }
      }
    }
  ]
}

这个文档的 loc.keyword 是上海市,因此会添加上 enrich data 中指定的属性:

POST _ingest/pipeline/loc-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "num": "B1001",
        "company": "Bilibili"
      }
    }
  ]
}

# 返回结果
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "company" : "Bilibili",
          "enrich_loc" : {
            "loc" : "上海市",
            "num" : "B1001"
          },
          "num" : "B1001"
        },
        "_ingest" : {
          "timestamp" : "2021-03-19T06:56:29.393585306Z"
        }
      }
    }
  ]
}

在 simulate 调试成功之后,我们在插入文档的时候指定 ingest pipeline:

# 方式一:单条插入
POST origin-location/_doc?pipeline=loc-pipeline
{
  "num": "A1001",
  "company": "腾讯"
}

POST origin-location/_doc?pipeline=loc-pipeline
{
  "num": "B1001",
  "company": "Bilibili"
}

# 方式二:批量插入
POST _bulk?pipeline=loc-pipeline
{"index":{"_index":"origin-location"}}
{"num":"A1001","company":"腾讯"}
{"index":{"_index":"origin-location"}}
{"num":"B1001","company":"Bilibili"}

查看插入的结果:

GET origin-location/_search

#返回结果
{
  "took" : 12,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "origin-location",
        "_type" : "_doc",
        "_id" : "zXxLSXgBUc4opBV-QiOv",
        "_score" : 1.0,
        "_source" : {
          "num" : "A1001",
          "company" : "腾讯"
        }
      },
      {
        "_index" : "origin-location",
        "_type" : "_doc",
        "_id" : "znxLSXgBUc4opBV-SCPk",
        "_score" : 1.0,
        "_source" : {
          "num" : "B1001",
          "company" : "Bilibili",
          "enrich_loc" : {
            "loc" : "上海市",
            "num" : "B1001"
          }
        }
      }
    ]
  }
}

也可以指定索引默认使用的 ingest pipeline ,这样就不用每次在插入文档的时候指定 ingest pipeline了:

# 指定索引默认使用的 ingest pipeline
PUT origin-location2
{
  "settings": {
    "default_pipeline": "loc-pipeline"  
  }
}

# 插入数据
POST _bulk
{"index":{"_index":"origin-location2"}}
{"num":"A1001","company":"腾讯"}
{"index":{"_index":"origin-location2"}}
{"num":"B1001","company":"Bilibili"}

# 查看结果
GET origin-location2/_search

# 输出结果
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "origin-location2",
        "_type" : "_doc",
        "_id" : "CXxPSXgBUc4opBV-oyTJ",
        "_score" : 1.0,
        "_source" : {
          "num" : "A1001",
          "company" : "腾讯"
        }
      },
      {
        "_index" : "origin-location2",
        "_type" : "_doc",
        "_id" : "CnxPSXgBUc4opBV-oyTJ",
        "_score" : 1.0,
        "_source" : {
          "num" : "B1001",
          "company" : "Bilibili",
          "enrich_loc" : {
            "loc" : "上海市",
            "num" : "B1001"
          }
        }
      }
    ]
  }
}

另外还可以使用 index template,通过正则表达式的方式匹配多个索引,来指定索引使用的 ingest pipeline:

# 使用 index template
PUT _template/my-template
{
  "index_patterns": ["origin-*"],
  "settings": {
   "default_pipeline": "loc-pipeline"
  }
}

# 插入数据
POST _bulk
{"index":{"_index":"origin-location3"}}
{"num":"A1001","company":"腾讯"}
{"index":{"_index":"origin-location3"}}
{"num":"B1001","company":"Bilibili"}

# 查看结果
GET origin-location3/_search

# 输出结果
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "origin-location3",
        "_type" : "_doc",
        "_id" : "XnxVSXgBUc4opBV-1yRp",
        "_score" : 1.0,
        "_source" : {
          "num" : "A1001",
          "company" : "腾讯"
        }
      },
      {
        "_index" : "origin-location3",
        "_type" : "_doc",
        "_id" : "X3xVSXgBUc4opBV-1yRp",
        "_score" : 1.0,
        "_source" : {
          "num" : "B1001",
          "company" : "Bilibili",
          "enrich_loc" : {
            "loc" : "上海市",
            "num" : "B1001"
          }
        }
      }
    ]
  }
}
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
目录
相关文章
|
Linux Go iOS开发
掌握Go语言:配置环境变量、深入理解GOPATH和GOROOT(1)
掌握Go语言:配置环境变量、深入理解GOPATH和GOROOT(1)
2129 0
|
消息中间件 架构师 安全
重新认识架构 — 不只是软件设计
通常情况下,人们对架构的认知仅限于在软件工程中的定义:架构主要指软件系统的结构设计,比如常见的 SOLID 准则、DDD 架构。一个良好的软件架构可以帮助团队更有效地进行软件开发,降低维护成本,提高系统的可扩展性和可维护性。这里的架构定义有更多元化的理解:架构不仅是对软件开发设计和流程规范的定义,也包含了参与架构设计的人员、以及项目过程中和架构有关的活动,都可以称为架构。 从广义角度来理解架构,意味着更全面的思考和新的融合。
137 0
|
3月前
|
传感器 人工智能 运维
如何在AR运维巡检中导入AI识别方案,达到智慧巡检目的
在工业运维巡检中,传统依赖人工的方式易出错且效率低。通过融合AR与AI技术,构建智慧巡检闭环,实现设备状态自动识别、预测性维护与高效协同。本文从场景锚定、知识沉淀、交互优化、反馈闭环、模型选型五大维度,解析AI识别方案的落地路径,助力工业智能化转型。
|
6月前
|
SQL 关系型数据库 分布式数据库
PolarDB开源数据库入门教程
PolarDB是阿里云推出的云原生数据库,基于PostgreSQL、MySQL和Oracle引擎构建,具备高性能、高扩展性和高可用性。其开源版采用计算与存储分离架构,支持快速弹性扩展和100%兼容PostgreSQL/MySQL。本文介绍了PolarDB的安装方法(Docker部署或源码编译)、基本使用(连接数据库、创建表等)及高级特性(计算节点扩展、存储自动扩容、并行查询等)。同时提供了性能优化建议和监控维护方法,帮助用户在生产环境中高效使用PolarDB。
2219 21
|
Java 数据库连接 mybatis
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception
在进行springboot和mybatis遇到了这个错误 Servlet.service() for servlet [dispatcherServlet] in context with path [] th
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception
|
测试技术 uml UED
软件需求管理:从获取到变更的全过程
【8月更文第20天】在软件开发项目中,需求管理是确保产品满足用户期望和业务目标的关键环节。本文将探讨软件需求管理的基本概念、需求获取的方法、需求分析与建模的实践、需求验证与确认的策略以及需求变更管理的最佳实践。
1120 5
|
数据可视化 数据挖掘 数据处理
开源埋点分析系统:洞察用户行为的新视角
在数字化浪潮中,了解用户行为和优化用户体验成为企业竞争力的关键。对于希望深入了解客户和推动业务增长的公司来说,埋点分析系统是不可或缺的工具。今天,我们要介绍的不仅是一个 ClkLog 埋点分析系统,而是一种全新的、开源的洞察方案,它能够帮助您捕捉每一个关键数据点,确保您的决策基于最准确的用户行为分析。
开源埋点分析系统:洞察用户行为的新视角
|
SQL Java 数据库连接
MyBatisPlus-聚合查询、分组查询及等值查询
MyBatisPlus-聚合查询、分组查询及等值查询
2576 0
|
Kubernetes Linux 虚拟化
一文详解容器技术简介和基本原理
本文全面阐述了容器技术的发展历程、关键技术、架构和当前的行业生态,特别是容器技术在云环境中的应用和演进。
西门子S7-1200编程实例,移位和循环移位指令如何使用?
西门子S7-1200的移位指令包括左移位指令和右移位指令,循环移位指令包括循环左移位指令和循环右移位指令。
西门子S7-1200编程实例,移位和循环移位指令如何使用?