Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasticsearch 8.X 小技巧:使用存储脚本优化数据索引与转换过程

1、引言

Elasticsearch 中,可以使用 Painless 脚本来实现一些非标准的处理结果。这些脚本可以直接嵌入到数据处理管道中,但为了使脚本与管道相互独立,还可以将脚本单独存储在 Elasticsearch 中,并在数据摄取管道(Ingest pipeline)中按需调用它们。

这种存储脚本的方式,咱们之前也有过介绍,Elasticsearch 中有个专有术语名词与之对应,叫:stored script 存储脚本。通过 stored script 方式,可以在不同的地方重复使用同一段脚本,而无需复制代码。

在Elasticsearch中使用 stored script 存储脚本是一种高效且灵活的方法,特别适用于那些需要在多个数据处理场景中重复使用相同逻辑的场合。通过这种方式,可以构建更加模块化、易于管理的数据处理管道。

2、Base64 解码的存储脚本实现

如下脚本的目的是将源数据中的字段从Base64格式转换为解码后的文本。

2.1 创建 Base64 解码脚本

PUT /_scripts/decodebase64
{
  "script": {
    "description": "Decode base64",
    "lang": "painless",
    "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();"
  }
}

脚本解读如下:

  • PUT /_scripts/decodebase64: 这部分指示Elasticsearch创建或更新一个名为decodebase64的脚本。
  • "script": 脚本的主体部分。
  • "description": 脚本的描述,说明了脚本的作用,即解码Base64。
  • "lang": 脚本的编写语言,这里使用的是Elasticsearch的Painless脚本语言。
  • "source": 脚本的具体内容。这个脚本接受一个字段名作为输入(params['field']),检查是否为空,如果不为空,则将其Base64解码并存储在指定的目标字段(params['target_field'])。

这个脚本可以在Elasticsearch的摄取管道中使用,用于在数据索引之前动态地对字段进行Base64解码。

2.2 获取存储脚本

如下脚本仅验证,实战中可忽略。

GET /_scripts/decodebase64

召回结果如下:

{
  "_id": "decodebase64",
  "found": true,
  "script": {
    "lang": "painless",
    "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();"
  }
}

注意:之前咱们很少这么用。看细节,上面的召回结果有  "_id": "decodebase64", 你关注一下,一会就能用到

2.3 创建使用Base64 解码存储脚本的管道

PUT /_ingest/pipeline/decodebase64
{
  "description": "Decode hash values",
  "processors": [
    {
      "script": {
        "id": "decodebase64",
        "params": {
          "field": "name_base64",
          "target_field": "name"
        }
      }
    }
  ]
}

上述代码创建了一个名为 decodebase64 的 Elasticsearch 摄取管道,其功能是使用存储的脚本 decodebase64 将字段 name_base64 中的 Base64 编码值解码,并将解码后的文本存储到 name 字段中。

和咱们之前讲的不同的地方、灵活的地方在于:field 和 target_field 变成了变量了,可以灵活按照项目需求替换之。

2.4 批量写入数据的时候同时指定 pipeline

POST /fruits/_bulk?pipeline=decodebase64
{"index":{"_id":"1"}}
{"name_base64":"QXBwbGU="}
{"index":{"_id":"2"}}
{"name_base64":"QW5hbmFz"}
{"index":{"_id":"3"}}
{"name_base64":"Q2hlcnJ5"}

如上 bulk 批量写入的时候指定 pipeline 的方式,咱们之前也少有讲解。

GET fruits/_search

结果如下图所示:

我们清晰的看到,咱们写入的 name_base64 字段借助我们创建的管道、基于存储脚本解码为 name字段值。

不着急下结论,咱们再看一组例子。

3、16进制解码的存储脚本实现

步骤参见第2部分,咱们只讲重点。

3.1 创建16进制解码存储脚本

如下存储脚本的目的:在Elasticsearch中创建并存储一个名为decodehex的脚本,该脚本用于将HEX(十六进制)编码的字符串转换为普通文本。

PUT /_scripts/decodehex
{
  "script": {
    "description": "Decode HEX",
    "lang": "painless",
    "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; StringBuilder sb = new StringBuilder(); for (int i = 0; i < src.length(); i += 2) { String byteStr = src.substring(i, i + 2); char byteChar = (char) Integer.parseInt(byteStr, 16); sb.append(byteChar) } ctx[target] = sb.toString();"
  }
}

脚本解读如下:

  • PUT /_scripts/decodehex: 这部分指示Elasticsearch创建或更新一个名为decodehex的脚本。
  • script: 脚本的主体部分。
  • description: 脚本的描述,说明了脚本的作用,即解码HEX字符串。
  • lang: 脚本的编写语言,这里使用的是Elasticsearch的Painless脚本语言。
  • source: 脚本的具体内容。这个脚本接受一个字段名作为输入(params['field']),检查是否为空,如果不为空,则将其HEX编码的内容转换为普通文本并存储在指定的目标字段(params['target_field'])。

如上脚本可以在Elasticsearch的摄取管道中使用,用于在数据索引之前动态地对字段进行 HEX 解码。

3.2 获取16进制解码存储脚本

如下脚本仅验证,实战中可忽略。

GET /_scripts/decodehex

召回结果如下:

3.3 创建使用16进制解码脚本的管道

PUT /_ingest/pipeline/decodehex
{
  "description": "Decode hash values",
  "processors": [
    {
      "script": {
        "id": "decodehex",
        "params": {
          "field": "color_hex",
          "target_field": "color"
        }
      }
    }
  ]
}

该管道的功能是使用存储的脚本 decodehex 来处理数据:它会取 color_hex 字段中的HEX(十六进制)编码字符串,将其解码成普通文本,并将解码后的结果存储到 color 字段中。这个过程主要用于在将数据索引到 Elasticsearch 之前自动进行数据转换和预处理。

同样,灵活的地方在于:field、target_field 是变量。

3.4 批量写入数据的时候同时指定 pipeline

POST /fruits_ext/_bulk?pipeline=decodehex
{"index":{"_id":"1"}}
{"color_hex":"477265656e"}
{"index":{"_id":"2"}}
{"color_hex":"59656c6c6f77"}
{"index":{"_id":"3"}}
{"color_hex":"526564"}

如上 bulk 批量写入的时候指定 pipeline 的方式,咱们之前也少有讲解。

GET fruits_ext/_search

结果如下图所示:

当然,第2部分、第3部分的存储脚本使用可以灵活的整合为一部分,如下所示。

PUT /_ingest/pipeline/decodehashes
{
  "description": "Decode hash values",
  "processors": [
    {
      "script": {
        "id": "decodebase64",
        "params": {
          "field": "name_base64",
          "target_field": "name"
        }
      }
    },
    {
      "script": {
        "id": "decodehex",
        "params": {
          "field": "color_hex",
          "target_field": "color"
        }
      }
    }
  ]
}

批量构建数据结果:

POST /fruits_all/_bulk?pipeline=decodehashes
{"index":{"_id":"1"}}
{"name_base64":"QXBwbGU=","color_hex":"477265656e"}
{"index":{"_id":"2"}}
{"name_base64":"QW5hbmFz","color_hex":"59656c6c6f77"}
{"index":{"_id":"3"}}
{"name_base64":"Q2hlcnJ5","color_hex":"526564"}

执行检索效果:

4、小结

我们一起探索了如何在Elasticsearch中创建并存储脚本,以及如何检索这些脚本,以确认它们的 id 和内容。我们还学习了如何在数据处理的摄取管道中调用这些存储的脚本。

通过这种方法,你可以有效地节省存储空间,并减少因重复编写相同脚本而可能出现的错误。简而言之,你只需编写和存储一次脚本,就可以在多个地方反复使用,这无疑提高了工作效率,同时也使得数据处理过程更加流畅和可靠。

小结一下使用存储脚本 stored script 的好处

  • 其一,可以为常见的数据转换或处理任务创建通用脚本,并在多个不同的管道中引用它们。
  • 其二,这不仅节省了开发时间,还有助于保持代码的一致性,因为所有的修改都集中在一个地方进行。
  • 其三,这种方法还提高了管道的可读性和维护性,因为管道本身不再充斥着复杂的脚本代码,而是通过引用存储脚本的方式来实现相同的功能。

5、说明

文章翻译自:

https://toughcoding.net/elasticsearch-use-stored-script-for-your-ingestion-pipeline

作者:Tomasz Dzierżanowski

铭毅天下做了完整的细化解读和重新梳理。

推荐阅读


更短时间更快习得更多干货!

和全球 近2000+ Elastic 爱好者一起精进!

比同事抢先一步学习进阶干货!


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1天前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
|
5天前
|
canal NoSQL 关系型数据库
实时计算 Flink版产品使用合集之如何在ElasticSearch中查看同步的数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之将数据写入Elasticsearch时,若Elasticsearch中的字段类型为date,对应的SQL类型应该是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
|
14天前
|
存储 缓存 搜索推荐
深入理解Elasticsearch倒排索引原理与优化策略
总之,Elasticsearch的倒排索引是其高效全文搜索的核心。为了提高性能和可伸缩性,Elasticsearch采用了多种优化策略,包括压缩、分片、合并、位集合和近实时搜索等。这些策略使Elasticsearch成为处理大规模文本数据的强大工具。
36 0
|
14天前
|
存储 机器学习/深度学习 搜索推荐
Elasticsearch 8.X 向量检索和普通检索能否实现组合检索?如何实现?
Elasticsearch 8.X 向量检索和普通检索能否实现组合检索?如何实现?
29 3
|
14天前
|
运维 安全 API
Elasticsearch 悬挂索引解析与管理指南
Elasticsearch 悬挂索引解析与管理指南
34 7
|
14天前
|
安全 API 数据安全/隐私保护
Elasticsearch 通过索引阻塞实现数据保护深入解析
Elasticsearch 通过索引阻塞实现数据保护深入解析
26 4
|
14天前
|
存储 监控 安全
Elasticsearch 8.X 集群 SSL 证书到期了,怎么更换?
Elasticsearch 8.X 集群 SSL 证书到期了,怎么更换?
51 3
|
14天前
|
API 索引
近期,几个典型 Elasticsearch 8.X 问题及方案探讨
近期,几个典型 Elasticsearch 8.X 问题及方案探讨
32 3

热门文章

最新文章