ES中摄取管道详解

简介: ES中摄取管道详解

一、什么是摄取管道


摄取管道 Ingest pipelines


摄取管道主要用来在数据被索引之前对数据执行常见的转换。

例如,您可以使用管道来移除字段、从文本中提取值以及丰富数据。


管道由一系列称为处理器的可配置任务组成。每个处理器按顺序运行,对传入的文档进行特定的更改。在处理器运行之后,Elasticsearch 将转换后的文档添加到数据流或索引中。


管道的工作流程图如下:

10.png


二、摄取管道使用


1.创建管道

方式一:在kibana中创建

Stack Management > Ingest Pipelines


方式二:采用API创建

下面的 create pipeline API 请求创建一个包含两个 set 处理器和一个小写处理器的管道。处理器按指定的顺序顺序运行。

PUT _ingest/pipeline/my-pipeline
{
  "description": "My optional pipeline description",
  "processors": [
    {
      "set": {
        "description": "My optional processor description",
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "description": "Set 'my-boolean-field' to true",
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    }
  ]
}


2.测试管道

方式一:在kibana中测试

选择创建的管道,打开编辑页面,测试管道——》添加文档

7.png

8.png

9.png


方式二:采用_simulate的API测试

1、在请求URL中指定管道

POST _ingest/pipeline/my-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

2、在请求body中指定管道

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "lowercase": {
          "field": "my-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}


3.在索引请求中使用管道

说明:在向索引my-data-stream添加数据时,使用管道y-pipeline


POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }


在使用_update_by_query和_reindex时使用管道:

POST my-data-stream/_update_by_query?pipeline=my-pipeline
POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

4.给索引设置默认管道

通过index.default_pipeline属性,可以给索引设置默认的管道。


5.索引模板中设置默认管道

PUT _component_template/logs-my_app-settings
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-my_app-default",
      "index.lifecycle.name": "logs"
    }
  }
}

6.管道异常处理

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}


三、管道功能演示


1、字段重命名

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}


2、删除特定记录

这里采用if配置管道处理函数的触发条件。

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents with 'network.name' of 'Guest'",
        "if": "ctx?.network?.name == 'Guest'"
      }
    }
  ]
}


更复杂的条件可以采用scripts脚本:

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "description": "Drop documents that don't contain 'prod' tag",
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}


注意⚠️:

尽量避免使用复杂或昂贵的条件脚本,昂贵的条件脚本会降低索引速度。


3、给字段赋值

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_source.my-long-field",
        "value": 10
      }
    }
  ]
}


采用元数据赋值

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "description": "Index the ingest timestamp as 'event.ingested'",
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}


总结


本文主要介绍了ES中摄取管道pipeline的使用。

摄取管道主要用来在数据被索引之前对数据执行常见的转换。

可以使用管道来移除字段、从文本中提取值以及丰富数据。

目录
相关文章
|
消息中间件 Kafka API
数据管道 Logstash 入门(上)
数据管道 Logstash 入门
117 0
|
5月前
|
传感器 PyTorch 数据处理
流式数据处理:DataLoader 在实时数据流中的作用
【8月更文第29天】在许多现代应用中,数据不再是以静态文件的形式存在,而是以持续生成的流形式出现。例如,传感器数据、网络日志、社交媒体更新等都是典型的实时数据流。对于这些动态变化的数据,传统的批处理方式可能无法满足低延迟和高吞吐量的要求。因此,开发能够处理实时数据流的系统变得尤为重要。
217 1
|
消息中间件 存储 NoSQL
数据管道 Logstash 入门(下)
数据管道 Logstash 入门
105 0
|
Unix 数据处理 Python
怎么还蹦出来个 “ 数据管道 ”
怎么还蹦出来个 “ 数据管道 ”
|
存储 数据采集 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Channel:临时存储数据的管道
在Flume中,Channel是数据采集和传输过程中的一个重要组件。它负责存储从Source获取的数据,并将其转发给Sink进行处理和存储。
164 0
|
NoSQL Shell Linux
如何使用 Flupy 构建数据处理管道
如何使用 Flupy 构建数据处理管道
169 0
|
存储
ES聚合查询详解(四):管道聚合
ES聚合查询详解(四):管道聚合
568 0
ES聚合查询详解(四):管道聚合
|
SQL NoSQL JavaScript
mongo 进阶之——聚合管道
上面这句话的意思是,先用pumber来进行分组,会有两个字段,一个是"_id"和"count",在后一个管道中用1表示显示,0表示不显示
mongo 进阶之——聚合管道
|
NoSQL MongoDB
MongoDB 聚合管道(Aggregation Pipeline)
MongoDB 聚合管道(Aggregation Pipeline)
276 0
MongoDB 聚合管道(Aggregation Pipeline)
|
数据采集 Dubbo 应用服务中间件
使用 Logstash 导入流式数据|学习笔记
快速学习使用 Logstash 导入流式数据
141 0
使用 Logstash 导入流式数据|学习笔记