1、引言
在 Elasticsearch 中,可以使用 Painless 脚本来实现一些非标准的处理结果。这些脚本可以直接嵌入到数据处理管道中,但为了使脚本与管道相互独立,还可以将脚本单独存储在 Elasticsearch 中,并在数据摄取管道(Ingest pipeline)中按需调用它们。
这种存储脚本的方式,咱们之前也有过介绍,Elasticsearch 中有个专有术语名词与之对应,叫:stored script 存储脚本。通过 stored script 方式,可以在不同的地方重复使用同一段脚本,而无需复制代码。
在Elasticsearch中使用 stored script 存储脚本是一种高效且灵活的方法,特别适用于那些需要在多个数据处理场景中重复使用相同逻辑的场合。通过这种方式,可以构建更加模块化、易于管理的数据处理管道。
2、Base64 解码的存储脚本实现
如下脚本的目的是将源数据中的字段从Base64格式转换为解码后的文本。
2.1 创建 Base64 解码脚本
PUT /_scripts/decodebase64 { "script": { "description": "Decode base64", "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();" } }
脚本解读如下:
- PUT /_scripts/decodebase64: 这部分指示Elasticsearch创建或更新一个名为decodebase64的脚本。
- "script": 脚本的主体部分。
- "description": 脚本的描述,说明了脚本的作用,即解码Base64。
- "lang": 脚本的编写语言,这里使用的是Elasticsearch的Painless脚本语言。
- "source": 脚本的具体内容。这个脚本接受一个字段名作为输入(params['field']),检查是否为空,如果不为空,则将其Base64解码并存储在指定的目标字段(params['target_field'])。
这个脚本可以在Elasticsearch的摄取管道中使用,用于在数据索引之前动态地对字段进行Base64解码。
2.2 获取存储脚本
如下脚本仅验证,实战中可忽略。
GET /_scripts/decodebase64
召回结果如下:
{ "_id": "decodebase64", "found": true, "script": { "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();" } }
注意:之前咱们很少这么用。看细节,上面的召回结果有 "_id": "decodebase64", 你关注一下,一会就能用到!
2.3 创建使用Base64 解码存储脚本的管道
PUT /_ingest/pipeline/decodebase64 { "description": "Decode hash values", "processors": [ { "script": { "id": "decodebase64", "params": { "field": "name_base64", "target_field": "name" } } } ] }
上述代码创建了一个名为 decodebase64 的 Elasticsearch 摄取管道,其功能是使用存储的脚本 decodebase64 将字段 name_base64 中的 Base64 编码值解码,并将解码后的文本存储到 name 字段中。
和咱们之前讲的不同的地方、灵活的地方在于:field 和 target_field 变成了变量了,可以灵活按照项目需求替换之。
2.4 批量写入数据的时候同时指定 pipeline
POST /fruits/_bulk?pipeline=decodebase64 {"index":{"_id":"1"}} {"name_base64":"QXBwbGU="} {"index":{"_id":"2"}} {"name_base64":"QW5hbmFz"} {"index":{"_id":"3"}} {"name_base64":"Q2hlcnJ5"}
如上 bulk 批量写入的时候指定 pipeline 的方式,咱们之前也少有讲解。
GET fruits/_search
结果如下图所示:
我们清晰的看到,咱们写入的 name_base64 字段借助我们创建的管道、基于存储脚本解码为 name字段值。
不着急下结论,咱们再看一组例子。
3、16进制解码的存储脚本实现
步骤参见第2部分,咱们只讲重点。
3.1 创建16进制解码存储脚本
如下存储脚本的目的:在Elasticsearch中创建并存储一个名为decodehex的脚本,该脚本用于将HEX(十六进制)编码的字符串转换为普通文本。
PUT /_scripts/decodehex { "script": { "description": "Decode HEX", "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; StringBuilder sb = new StringBuilder(); for (int i = 0; i < src.length(); i += 2) { String byteStr = src.substring(i, i + 2); char byteChar = (char) Integer.parseInt(byteStr, 16); sb.append(byteChar) } ctx[target] = sb.toString();" } }
脚本解读如下:
- PUT /_scripts/decodehex: 这部分指示Elasticsearch创建或更新一个名为decodehex的脚本。
- script: 脚本的主体部分。
- description: 脚本的描述,说明了脚本的作用,即解码HEX字符串。
- lang: 脚本的编写语言,这里使用的是Elasticsearch的Painless脚本语言。
- source: 脚本的具体内容。这个脚本接受一个字段名作为输入(params['field']),检查是否为空,如果不为空,则将其HEX编码的内容转换为普通文本并存储在指定的目标字段(params['target_field'])。
如上脚本可以在Elasticsearch的摄取管道中使用,用于在数据索引之前动态地对字段进行 HEX 解码。
3.2 获取16进制解码存储脚本
如下脚本仅验证,实战中可忽略。
GET /_scripts/decodehex
召回结果如下:
3.3 创建使用16进制解码脚本的管道
PUT /_ingest/pipeline/decodehex { "description": "Decode hash values", "processors": [ { "script": { "id": "decodehex", "params": { "field": "color_hex", "target_field": "color" } } } ] }
该管道的功能是使用存储的脚本 decodehex 来处理数据:它会取 color_hex 字段中的HEX(十六进制)编码字符串,将其解码成普通文本,并将解码后的结果存储到 color 字段中。这个过程主要用于在将数据索引到 Elasticsearch 之前自动进行数据转换和预处理。
同样,灵活的地方在于:field、target_field 是变量。
3.4 批量写入数据的时候同时指定 pipeline
POST /fruits_ext/_bulk?pipeline=decodehex {"index":{"_id":"1"}} {"color_hex":"477265656e"} {"index":{"_id":"2"}} {"color_hex":"59656c6c6f77"} {"index":{"_id":"3"}} {"color_hex":"526564"}
如上 bulk 批量写入的时候指定 pipeline 的方式,咱们之前也少有讲解。
GET fruits_ext/_search
结果如下图所示:
当然,第2部分、第3部分的存储脚本使用可以灵活的整合为一部分,如下所示。
PUT /_ingest/pipeline/decodehashes { "description": "Decode hash values", "processors": [ { "script": { "id": "decodebase64", "params": { "field": "name_base64", "target_field": "name" } } }, { "script": { "id": "decodehex", "params": { "field": "color_hex", "target_field": "color" } } } ] }
批量构建数据结果:
POST /fruits_all/_bulk?pipeline=decodehashes {"index":{"_id":"1"}} {"name_base64":"QXBwbGU=","color_hex":"477265656e"} {"index":{"_id":"2"}} {"name_base64":"QW5hbmFz","color_hex":"59656c6c6f77"} {"index":{"_id":"3"}} {"name_base64":"Q2hlcnJ5","color_hex":"526564"}
执行检索效果:
4、小结
我们一起探索了如何在Elasticsearch中创建并存储脚本,以及如何检索这些脚本,以确认它们的 id 和内容。我们还学习了如何在数据处理的摄取管道中调用这些存储的脚本。
通过这种方法,你可以有效地节省存储空间,并减少因重复编写相同脚本而可能出现的错误。简而言之,你只需编写和存储一次脚本,就可以在多个地方反复使用,这无疑提高了工作效率,同时也使得数据处理过程更加流畅和可靠。
小结一下使用存储脚本 stored script 的好处:
- 其一,可以为常见的数据转换或处理任务创建通用脚本,并在多个不同的管道中引用它们。
- 其二,这不仅节省了开发时间,还有助于保持代码的一致性,因为所有的修改都集中在一个地方进行。
- 其三,这种方法还提高了管道的可读性和维护性,因为管道本身不再充斥着复杂的脚本代码,而是通过引用存储脚本的方式来实现相同的功能。
5、说明
文章翻译自:
https://toughcoding.net/elasticsearch-use-stored-script-for-your-ingestion-pipeline
作者:Tomasz Dzierżanowski
铭毅天下做了完整的细化解读和重新梳理。
推荐阅读
更短时间更快习得更多干货!
和全球 近2000+ Elastic 爱好者一起精进!
比同事抢先一步学习进阶干货!