Ingest pipelines—Elastic Stack 实战手册

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择

970X90.png

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:李增胜

Elastic 提供了三种方式进行数据加工处理:Logstash、Beats Processors 以及 Ingest Pipeline,本文着重介绍 Ingest Pipeline,以下比较了 Logstash 与 Ingest Pipeline的一些区别,便于在实际业务场景中选择:

种类 部署 数据缓冲 数据处理 数据源
Logstash 需要另外部署,增加复杂性 采用队列机制缓冲数据,多队列支持 支持大量processors,远超 ingest 支持外部数据源,如MYSQL、Kafka、Beats等
Ingest pipeline 无需另外部署,易于扩展 无缓冲策略 支持超过30种processors Ingest 也可和 Beats 或者 Logstash 解决特定场景数据源问题

总结:

  • 如果业务场景 Ingest pipeline 已经能处理完成,则无需使用 Logstash ,相反,如果业务处理数据场景要支持外部数据源,则选择 Logstash
  • 如果业务场景需要缓冲数据,则采用 Logstash 较优
  • 如果数据处理完成后需要输出到非 Elasticsearch 内部,则采用 Logstash
  • 在简化配置方便,如果想配置简单,则选择 Elasticsearch ingest pipeline 即可

显然,Ingest pipeline 并非 Logstatsh 的替代品,需要根据自己的业务处理数据的要求和架构设计来选择对应的技术,并非二选一,也可以同时使用,对处理不同数据采用不同的技术架构。

Kibana Dev Tools 管理 Pipeline

Ingest Pipeline

用于预处理数据,由 Elasticsearch Ingest Node 节点负责运行处理,如需要系统性能提升可单独部署 Ingest Node 节点

优点:

  • 由 Ingest Node 节点负责处理,职责清晰
  • 更多 Processors 支持,扩展性强
  • 轻量级,覆盖了 Logstash 大多常用场景

Ingest Pipeline 是一系列处理管道,由一系列的 Processors 组成处理,先来看下 pipeline 的处理过程:

1.png

在 Kibana 中也可以创建 Ingest pipeline,在稍微章节给出示例。

常用 的 Processors 如下

更多 Pipeline Processors 参考更多; https://www.elastic.co/guide/en/elasticsearch/reference/master/processors.html

Trim

去除空格,如果是字符串类型的数组,数组中所有字符串都会被替换空格处理

Split

切分字符串,使用指定切分符,切分字符串为数组结构,只作用与字符串类型

Rename

重命名字段

Foreach

对一组数据进行相同的预处理,可以使用 Foreach

Lowercase / Uppercase

对字段进行大小写转换

Script

使用脚本语言进行数据预处理

Gsub

对字符串进行替换

Append

添加数据到数组

Set

设置字段值

Remove

移除字段

Trim

去除字符串中的空格

PUT _ingest/pipeline/trim_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "trim": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
}

POST _ingest/pipeline/trim_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 ",
          " auto2222 "
        ]
      }
    }
  ]
}

#返回:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222",
            "auto2222"
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:19:13.542743Z"
        }
      }
    }
  ]
}

Split / Foreach

切分字符串,使用指定切分符,切分字符串为数组结构,只作用于字符串类型

PUT _ingest/pipeline/split_pipeline
{
  "processors": [
    {
      "foreach": {
        "field": "message",
        "processor": {
          "split": {
            "field": "_ingest._value",
            "separator": " "
          }
        }
      }
    }
  ]
}

#测试
POST _ingest/pipeline/split_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "car222 aaa",
          " auto2222 aaaa bbb"
        ]
      }
    }
  ]
}
#返回,可以看到 message 按照空格切分为了多个字符串数组
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            [
              "car222",
              "aaa"
            ],
            [
              "",
              "auto2222",
              "aaaa",
              "bbb"
            ]
          ]
        },
        "_ingest" : {
          "_value" : null,
          "timestamp" : "2021-04-28T13:28:20.762312Z"
        }
      }
    }
  ]
}

Rename

重命名一个字段, rename 往往和 reindex 结合使用

POST goods_info_comment_message/_bulk
{"index":{"_id":1}}
{"message":"美 国苹果 "}
{"index":{"_id":2}}
{"message":"山东 苹果 "}


#定义 rename_pipeline
PUT _ingest/pipeline/rename_pipeline
{
  "processors": [
    {
      "rename": {
        "field": "message",
        "target_field": "message_new"
      }
    }
  ]
}

#重建 index
POST _reindex
{
  "source": {
    "index": "goods_info_comment_message"
  },
  "dest": {
    "index": "goods_info_comment_message_new",
    "pipeline": "rename_pipeline"
  }
}

#查询 mapping
GET goods_info_comment_message_new/_mapping

#返回
{
  "goods_info_comment_message_new" : {
    "mappings" : {
      "properties" : {
        "message_new" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

Lowercase / Uppercase

将字符串修改为大写或者小写

PUT _ingest/pipeline/lowercase_pipeline
{
  "description": "lowercase processor",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    }
  ]
}

#测试,部分字符大写
POST _ingest/pipeline/lowercase_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}

#结果,全部输出为小写
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : [
            "car222 aaa",
            " auto2222 aaaa bbb"
          ]
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:12:10.041308Z"
        }
      }
    }
  ]
}

Remove

移除已经存在的字段

#定义remove pipelint
PUT _ingest/pipeline/remove_pipeline
{
  "description": "remove processor",
  "processors": [
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

#测试
POST _ingest/pipeline/remove_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": [
          "CAr222 aaa",
          " auto2222 aaaa Bbb"
        ]
      }
    }
  ]
}
#返回,可以看到message字段已经被移除
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : { },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:15:27.811516Z"
        }
      }
    }
  ]
}

Set

给已有字段进行赋值

PUT _ingest/pipeline/set_pipeline
{
  "description": "set processor",
  "processors": [
    {
      "set": {
        "field": "message",
        "value": "this is a new message"
      }
    }
  ]
}


POST _ingest/pipeline/set_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "this"
      }
    }
  ]
}

#返回
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : "this is a new message"
        },
        "_ingest" : {
          "timestamp" : "2021-04-28T15:21:28.928512Z"
        }
      }
    }
  ]
}

Kibana Dev Tools 管理 Pipeline

下面介绍如何在 kibana 中通过界面来创建 Pipeline,打开 Kibana 首页:

2.png

选择 Ingest Node Pipelines,右边会展示已有的 Pipeline 列表

3.png

选择新创建 Ppipeline

4.png

5.png

我们选择创建一个 lowercase processor

6.png

点击 Add documents 进行相关测试

7.png

添加测试文档:

[
  {
    "_index": "index_lowercase",
    "_id": "1",
    "_source": {
      "message": "This is a Test"
    }
  }
]

8.png

可以看到,测试成功,字符串全部变为了小写

9.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
11月前
|
数据采集
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(2)
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(2)
|
11月前
|
数据处理
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(1)
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(1)
|
11月前
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(3)
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(3)
|
11月前
|
项目管理 微服务
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(4)
带你读《Elastic Stack 实战手册》之30:——3.4.2.15.ingest pipelines(4)
|
11月前
|
测试技术 Apache
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
146 0
|
11月前
|
存储 安全 数据可视化
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
148 0
|
11月前
|
数据采集 数据可视化 搜索推荐
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
212 0
|
11月前
|
定位技术
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(19)
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(19)
|
11月前
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(18)
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(18)
|
11月前
|
API 数据库
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(5)
带你读《Elastic Stack 实战手册》之18:——3.4.2.3.Search通过Kibana(5)

相关课程

更多