Ingest pipelines—Elastic Stack 实战手册-阿里云开发者社区

开发者社区> Elasticsearch 技术团队> 正文
登录阅读全文

Ingest pipelines—Elastic Stack 实战手册

简介: 本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择

970X90.png

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:李增胜

Elastic 提供了三种方式进行数据加工处理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择:

种类部署数据缓冲数据处理数据源
Logstash需要另外部署,增加复杂性采用队列机制缓冲数据,多队列支持支持大量processors,远超 ingest支持外部数据源,如MYSQL、Kafka、Beats等
Ingest pipeline无需另外部署,易于扩展无缓冲策略支持超过30种processorsIngest 也可和 Beats 或者 Logstash 解决特定场景数据源问题

总结:

  • 如果业务场景 Ingest pipeline 已经能处理完成,则无需使用 Logstash ,相反,如果业务处理数据场景要支持外部数据源,则选择 Logstash
  • 如果业务场景需要缓冲数据,则采用 Logstash 较优
  • 如果数据处理完成后需要输出到非 Elasticsearch 内部,则采用 Logstash
  • 在简化配置方便,如果想配置简单,则选择 Elasticsearch ingest pipeline 即可

显然,Ingest pipeline 并非 Logstatsh 的替代品,需要根据自己的业务处理数据的要求和架构设计来选择对应的技术,并非二选一,也可以同时使用,对处理不同数据采用不同的技术架构。

Kibana Dev Tools 管理 Pipeline

Ingest Pipeline

用于预处理数据,由 Elasticsearch Ingest Node 节点负责运行处理,如需要系统性能提升可单独部署 Ingest Node 节点

优点:

  • 由 Ingest Node 节点负责处理,职责清晰
  • 更多 Processors 支持,扩展性强
  • 轻量级,覆盖了 Logstash 大多常用场景

Ingest Pipeline 是一系列处理管道,由一系列的 Processors 组成处理,先来看下 pipeline 的处理过程:

1.png

在 Kibana 中也可以创建 Ingest pipeline,在稍微章节给出示例。

常用 的 Processors 如下

更多 Pipeline Processors 参考更多;https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html

Trim

去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理

Split

切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型

Rename

重命名字段

Foreach

对一组数据进行相同的预处理,可以使用 Foreach

Lowercase / Uppercase

对字段进行大小写转换

Script

使用脚本语言进行数据预处理

Gsub

对字符串进行替换

Append

添加数据到数组

Set

设置字段值

Remove

移除字段

Trim

去除字符串中的空格

PUT _ingest/pipeline/trim_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

POST _ingest/pipeline/trim_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 ",
          " auto2222 "
        ]
      }
    }
  ]
}

#返回:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222",
            "auto2222"
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:19:13.542743Z"
        }
      }
    }
  ]
}

Split / Foreach

切分字符串,使用指定切分符,切分字符串为数组结构,只作用于字符串类型

PUT _ingest/pipeline/split_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "split": {
            "field": "_ingest._value",
            "separator": " "
          }
        }
      }
    }
  ]
}

#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 aaa",
          " auto2222 aaaa bbb"
        ]
      }
    }
  ]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            [
              "car222",
              "aaa"
            ],
            [
              "",
              "auto2222",
              "aaaa",
              "bbb"
            ]
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:28:20.762312Z"
        }
      }
    }
  ]
}

Rename

重命名一个字段, rename 往往和 reindex 结合使用

POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}


#定义 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
  "processors": [
    {
      "rename": {
        "field": "message",
        "target_field": "message_new"
      }
    }
  ]
}

#重建 index
POST _reindex
{
  "source": {
    "index": "goods_info_comment_message"
  },
  "dest": {
    "index": "goods_info_comment_message_new",
    "pipeline": "rename_pipeline"
  }
}

#查询 mapping
GET goods_info_comment_message_new/_mapping

#返回
{
  "goods_info_comment_message_new" : {
    "mappings" : {
      "properties" : {
        "message_new" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

Lowercase / Uppercase

将字符串修改为大写或者小写

PUT _ingest/pipeline/lowercase_pipeline
{
  "description": "lowercase processor",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    }
  ]
}

#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#结果,全部输出为小写
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222 aaa",
            " auto2222 aaaa bbb"
          ]
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:12:10.041308Z"
        }
      }
    }
  ]
}

Remove

移除已经存在的字段

#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
  "description": "remove processor",
  "processors": [
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}
#返回,可以看到message字段已经被移除
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : { },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:15:27.811516Z"
        }
      }
    }
  ]
}

Set

给已有字段进行赋值

PUT _ingest/pipeline/set_pipeline
{
  "description": "set processor",
  "processors": [
    {
      "set": {
        "field": "message",
        "value": "this is a new message"
      }
    }
  ]
}


POST _ingest/pipeline/set_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this"
      }
    }
  ]
}

#返回
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "this is a new message"
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:21:28.928512Z"
        }
      }
    }
  ]
}

Kibana Dev Tools 管理 Pipeline

下面介绍如何在 kibana 中通过界面来创建 Pipeline,打开 Kibana 首页:

2.png

选择 Ingest Node Pipelines,右边会展示已有的 Pipeline 列表

3.png

选择新创建 Ppipeline

4.png

5.png

我们选择创建一个 lowercase processor

6.png

点击 Add documents 进行相关测试

7.png

添加测试文档:

[
  {
    "_index": "index_lowercase",
    "_id": "1",
    "_source": {
      "message": "This is a Test"
    }
  }
]

8.png

可以看到,测试成功,字符串全部变为了小写

9.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享: