· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》
创作人:李增胜
Elastic 提供了三种方式进行数据加工处理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择:
种类 | 部署 | 数据缓冲 | 数据处理 | 数据源 |
---|---|---|---|---|
Logstash | 需要另外部署,增加复杂性 | 采用队列机制缓冲数据,多队列支持 | 支持大量processors,远超 ingest | 支持外部数据源,如MYSQL、Kafka、Beats等 |
Ingest pipeline | 无需另外部署,易于扩展 | 无缓冲策略 | 支持超过30种processors | Ingest 也可和 Beats 或者 Logstash 解决特定场景数据源问题 |
总结:
- 如果业务场景 Ingest pipeline 已经能处理完成,则无需使用 Logstash ,相反,如果业务处理数据场景要支持外部数据源,则选择 Logstash
- 如果业务场景需要缓冲数据,则采用 Logstash 较优
- 如果数据处理完成后需要输出到非 Elasticsearch 内部,则采用 Logstash
- 在简化配置方便,如果想配置简单,则选择 Elasticsearch ingest pipeline 即可
显然,Ingest pipeline 并非 Logstatsh 的替代品,需要根据自己的业务处理数据的要求和架构设计来选择对应的技术,并非二选一,也可以同时使用,对处理不同数据采用不同的技术架构。
Kibana Dev Tools 管理 Pipeline
Ingest Pipeline
用于预处理数据,由 Elasticsearch Ingest Node 节点负责运行处理,如需要系统性能提升可单独部署 Ingest Node 节点
优点:
- 由 Ingest Node 节点负责处理,职责清晰
- 更多 Processors 支持,扩展性强
- 轻量级,覆盖了 Logstash 大多常用场景
Ingest Pipeline 是一系列处理管道,由一系列的 Processors 组成处理,先来看下 pipeline 的处理过程:
在 Kibana 中也可以创建 Ingest pipeline,在稍微章节给出示例。
常用 的 Processors 如下
更多 Pipeline Processors 参考更多; https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html
Trim
去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理
Split
切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型
Rename
重命名字段
Foreach
对一组数据进行相同的预处理,可以使用 Foreach
Lowercase / Uppercase
对字段进行大小写转换
Script
使用脚本语言进行数据预处理
Gsub
对字符串进行替换
Append
添加数据到数组
Set
设置字段值
Remove
移除字段
Trim
去除字符串中的空格
PUT _ingest/pipeline/trim_pipeline { "processors": [ { "foreach": { "field": "message", "processor": { "trim": { "field": "_ingest._value" } } } } ] } POST _ingest/pipeline/trim_pipeline/_simulate { "docs": [ { "_source": { "message": [ "car222 ", " auto2222 " ] } } ] } #返回: { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : [ "car222", "auto2222" ] }, "_ingest" : { "_value" : null, "timestamp" : "2021-04-28T13:19:13.542743Z" } } } ] }
AI 代码解读
Split / Foreach
切分字符串,使用指定切分符,切分字符串为数组结构,只作用于字符串类型
PUT _ingest/pipeline/split_pipeline { "processors": [ { "foreach": { "field": "message", "processor": { "split": { "field": "_ingest._value", "separator": " " } } } } ] } #测试 POST _ingest/pipeline/split_pipeline/_simulate { "docs": [ { "_source": { "message": [ "car222 aaa", " auto2222 aaaa bbb" ] } } ] } #返回,可以看到 message 按照空格切分为了多个字符串数组 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : [ [ "car222", "aaa" ], [ "", "auto2222", "aaaa", "bbb" ] ] }, "_ingest" : { "_value" : null, "timestamp" : "2021-04-28T13:28:20.762312Z" } } } ] }
AI 代码解读
Rename
重命名一个字段, rename 往往和 reindex 结合使用
POST goods_info_comment_message/_bulk {"index":{"_id":1}} {"message":"美 国苹果 "} {"index":{"_id":2}} {"message":"山东 苹果 "} #定义 rename_pipeline PUT _ingest/pipeline/rename_pipeline { "processors": [ { "rename": { "field": "message", "target_field": "message_new" } } ] } #重建 index POST _reindex { "source": { "index": "goods_info_comment_message" }, "dest": { "index": "goods_info_comment_message_new", "pipeline": "rename_pipeline" } } #查询 mapping GET goods_info_comment_message_new/_mapping #返回 { "goods_info_comment_message_new" : { "mappings" : { "properties" : { "message_new" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } } }
AI 代码解读
Lowercase / Uppercase
将字符串修改为大写或者小写
PUT _ingest/pipeline/lowercase_pipeline { "description": "lowercase processor", "processors": [ { "lowercase": { "field": "message" } } ] } #测试,部分字符大写 POST _ingest/pipeline/lowercase_pipeline/_simulate { "docs": [ { "_source": { "message": [ "CAr222 aaa", " auto2222 aaaa Bbb" ] } } ] } #结果,全部输出为小写 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : [ "car222 aaa", " auto2222 aaaa bbb" ] }, "_ingest" : { "timestamp" : "2021-04-28T15:12:10.041308Z" } } } ] }
AI 代码解读
Remove
移除已经存在的字段
#定义remove pipelint PUT _ingest/pipeline/remove_pipeline { "description": "remove processor", "processors": [ { "remove": { "field": "message" } } ] } #测试 POST _ingest/pipeline/remove_pipeline/_simulate { "docs": [ { "_source": { "message": [ "CAr222 aaa", " auto2222 aaaa Bbb" ] } } ] } #返回,可以看到message字段已经被移除 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { }, "_ingest" : { "timestamp" : "2021-04-28T15:15:27.811516Z" } } } ] }
AI 代码解读
Set
给已有字段进行赋值
PUT _ingest/pipeline/set_pipeline { "description": "set processor", "processors": [ { "set": { "field": "message", "value": "this is a new message" } } ] } POST _ingest/pipeline/set_pipeline/_simulate { "docs": [ { "_source": { "message": "this" } } ] } #返回 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "this is a new message" }, "_ingest" : { "timestamp" : "2021-04-28T15:21:28.928512Z" } } } ] }
AI 代码解读
Kibana Dev Tools 管理 Pipeline
下面介绍如何在 kibana 中通过界面来创建 Pipeline,打开 Kibana 首页:
选择 Ingest Node Pipelines,右边会展示已有的 Pipeline 列表
选择新创建 Ppipeline
我们选择创建一个 lowercase processor
点击 Add documents 进行相关测试
添加测试文档:
[ { "_index": "index_lowercase", "_id": "1", "_source": { "message": "This is a Test" } } ]
AI 代码解读
可以看到,测试成功,字符串全部变为了小写