· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》
创作人:李增胜
Elastic 提供了三种方式进行数据加工处理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择:
种类 | 部署 | 数据缓冲 | 数据处理 | 数据源 |
---|---|---|---|---|
Logstash | 需要另外部署,增加复杂性 | 采用队列机制缓冲数据,多队列支持 | 支持大量processors,远超 ingest | 支持外部数据源,如MYSQL、Kafka、Beats等 |
Ingest pipeline | 无需另外部署,易于扩展 | 无缓冲策略 | 支持超过30种processors | Ingest 也可和 Beats 或者 Logstash 解决特定场景数据源问题 |
总结:
- 如果业务场景 Ingest pipeline 已经能处理完成,则无需使用 Logstash ,相反,如果业务处理数据场景要支持外部数据源,则选择 Logstash
- 如果业务场景需要缓冲数据,则采用 Logstash 较优
- 如果数据处理完成后需要输出到非 Elasticsearch 内部,则采用 Logstash
- 在简化配置方便,如果想配置简单,则选择 Elasticsearch ingest pipeline 即可
显然,Ingest pipeline 并非 Logstatsh 的替代品,需要根据自己的业务处理数据的要求和架构设计来选择对应的技术,并非二选一,也可以同时使用,对处理不同数据采用不同的技术架构。
Kibana Dev Tools 管理 Pipeline
Ingest Pipeline
用于预处理数据,由 Elasticsearch Ingest Node 节点负责运行处理,如需要系统性能提升可单独部署 Ingest Node 节点
优点:
- 由 Ingest Node 节点负责处理,职责清晰
- 更多 Processors 支持,扩展性强
- 轻量级,覆盖了 Logstash 大多常用场景
Ingest Pipeline 是一系列处理管道,由一系列的 Processors 组成处理,先来看下 pipeline 的处理过程:
在 Kibana 中也可以创建 Ingest pipeline,在稍微章节给出示例。
常用 的 Processors 如下
更多 Pipeline Processors 参考更多; https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html
Trim
去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理
Split
切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型
Rename
重命名字段
Foreach
对一组数据进行相同的预处理,可以使用 Foreach
Lowercase / Uppercase
对字段进行大小写转换
Script
使用脚本语言进行数据预处理
Gsub
对字符串进行替换
Append
添加数据到数组
Set
设置字段值
Remove
移除字段
Trim
去除字符串中的空格
PUT _ingest/pipeline/trim_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"trim": {
"field": "_ingest._value"
}
}
}
}
]
}
POST _ingest/pipeline/trim_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 ",
" auto2222 "
]
}
}
]
}
#返回:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222",
"auto2222"
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:19:13.542743Z"
}
}
}
]
}
Split / Foreach
切分字符串,使用指定切分符,切分字符串为数组结构,只作用于字符串类型
PUT _ingest/pipeline/split_pipeline
{
"processors": [
{
"foreach": {
"field": "message",
"processor": {
"split": {
"field": "_ingest._value",
"separator": " "
}
}
}
}
]
}
#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"car222 aaa",
" auto2222 aaaa bbb"
]
}
}
]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
[
"car222",
"aaa"
],
[
"",
"auto2222",
"aaaa",
"bbb"
]
]
},
"_ingest" : {
"_value" : null,
"timestamp" : "2021-04-28T13:28:20.762312Z"
}
}
}
]
}
Rename
重命名一个字段, rename 往往和 reindex 结合使用
POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}
#定义 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
"processors": [
{
"rename": {
"field": "message",
"target_field": "message_new"
}
}
]
}
#重建 index
POST _reindex
{
"source": {
"index": "goods_info_comment_message"
},
"dest": {
"index": "goods_info_comment_message_new",
"pipeline": "rename_pipeline"
}
}
#查询 mapping
GET goods_info_comment_message_new/_mapping
#返回
{
"goods_info_comment_message_new" : {
"mappings" : {
"properties" : {
"message_new" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
Lowercase / Uppercase
将字符串修改为大写或者小写
PUT _ingest/pipeline/lowercase_pipeline
{
"description": "lowercase processor",
"processors": [
{
"lowercase": {
"field": "message"
}
}
]
}
#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#结果,全部输出为小写
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : [
"car222 aaa",
" auto2222 aaaa bbb"
]
},
"_ingest" : {
"timestamp" : "2021-04-28T15:12:10.041308Z"
}
}
}
]
}
Remove
移除已经存在的字段
#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
"description": "remove processor",
"processors": [
{
"remove": {
"field": "message"
}
}
]
}
#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": [
"CAr222 aaa",
" auto2222 aaaa Bbb"
]
}
}
]
}
#返回,可以看到message字段已经被移除
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : { },
"_ingest" : {
"timestamp" : "2021-04-28T15:15:27.811516Z"
}
}
}
]
}
Set
给已有字段进行赋值
PUT _ingest/pipeline/set_pipeline
{
"description": "set processor",
"processors": [
{
"set": {
"field": "message",
"value": "this is a new message"
}
}
]
}
POST _ingest/pipeline/set_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "this"
}
}
]
}
#返回
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"message" : "this is a new message"
},
"_ingest" : {
"timestamp" : "2021-04-28T15:21:28.928512Z"
}
}
}
]
}
Kibana Dev Tools 管理 Pipeline
下面介绍如何在 kibana 中通过界面来创建 Pipeline,打开 Kibana 首页:
选择 Ingest Node Pipelines,右边会展示已有的 Pipeline 列表
选择新创建 Ppipeline
我们选择创建一个 lowercase processor
点击 Add documents 进行相关测试
添加测试文档:
[
{
"_index": "index_lowercase",
"_id": "1",
"_source": {
"message": "This is a Test"
}
}
]
可以看到,测试成功,字符串全部变为了小写