elasticsearch pipelineI详解:原理与使用

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: elasticsearch pipelineI详解:原理与使用

一、Pipeline 背景和原理

Elasticsearch 5.0之前的文档预处理

在 Elasticsearch 5.0 版本之前,如果用户希望在文档被索引到 Elasticsearch 之前进行预处理,他们通常需要依赖外部工具,如 Logstash,或者以编程方式/手动进行预处理。这是因为早期的 Elasticsearch 版本并不提供文档预处理或转换的能力,它仅仅是将文档按原样索引。


Ingest Node的引入

从 Elasticsearch 5.x 版本开始,为了解决这个问题,Elasticsearch 引入了一个名为 ingest node 的功能。Ingest node 为 Elasticsearch 本身提供了文档预处理和丰富的轻量级解决方案。这意味着用户可以在 Elasticsearch 内部直接对文档进行预处理,而无需依赖外部工具。


Ingest Node的工作原理

当数据进入 Elastic 集群并指定了特定的 Pipeline 时,Elasticsearch 中的 ingest node 会按照定义好的处理器(processor)顺序对数据进行操作和处理。这种预处理是通过截取批量和索引请求在 ingest node 上执行的,处理完成后将文档传递回索引或批量 API。

要在索引之前预处理文档,用户必须定义一个 Pipeline。Pipeline 是一系列处理器的集合,用于转换传入的文档。每个处理器都以某种方式转换文档,并且它们按照在 Pipeline 中定义的顺序执行。

要使用 Pipeline,用户只需在索引或批量请求上指定 pipeline 参数,告诉 ingest node 使用哪个 Pipeline。

Ingest Node的配置与灵活性

如果使用默认配置实现 Elasticsearch 节点,默认情况下将启用 master、data 和 ingest 功能,这意味着节点将充当主节点、数据节点和提取节点。但是,如果用户在 elasticsearch.yml 文件中配置了 node.ingest: false,则该节点上的 ingest 功能将被禁用。


与 Logstash 相比,Elasticsearch 的 ingest node 提供了更高的灵活性。因为用户可以通过编程的方式随时修改 Pipeline,而无需重启整个 Logstash 集群。

Elasticsearch对Logstash的替代

随着新的 ingest 功能的发布,Elasticsearch 已经取出了 Logstash 的部分功能,特别是其过滤器部分。这意味着用户现在可以在 Elasticsearch 中直接处理原始日志,而无需先通过 Logstash 进行过滤和预处理。这进一步简化了数据处理流程,并提高了系统的整体性能。

二、Pipeline API使用

要使用Pipeline API,首先需要定义Pipeline。Pipeline由两部分组成:描述(description)和处理器列表(processor list)。

描述(Description):这是一个非必需字段,用于存储关于Pipeline的一些描述性信息,如用途、作者等。虽然这个字段不是必需的,但它对于理解和维护Pipeline非常有帮助。

处理器列表(Processor List):这是Pipeline的核心部分,它定义了用于转换文档的处理器序列。每个处理器以某种方式转换文档,如替换文本、转换数据类型、删除字段等。处理器按照在Pipeline中定义的顺序执行。

Elasticsearch提供了大约20个内置的处理器,这些处理器可以在构建Pipeline时使用。此外,还可以使用一些插件提供的处理器,如Ingest Attachment用于处理附件数据、Ingest Geo-IP用于根据IP地址提取地理位置信息等。这些插件增强了Pipeline的数据处理能力。


定义好Pipeline后,就可以通过在索引或批量请求上指定Pipeline参数来使用它。例如,当通过POST请求将数据发送到指定索引时,可以带上pipeline参数来指定使用的Pipeline。

1. 定义 Pipeline

使用 PUT 请求和 _ingest/pipeline/<pipeline_id> 端点来定义一个新的 Pipeline 或更新一个已存在的 Pipeline。Pipeline 的定义包含了一个可选的 description 字段和一个 processors 列表。

例如,定义一个名为 firstpipeline 的 Pipeline,它将消息字段(message)中的值转换为大写:

PUT _ingest/pipeline/firstpipeline
{
  "description": "将 message 字段中的值转换为大写",
  "processors": [
    {
      "uppercase": {
        "field": "message"
      }
    }
  ]
}

2. 使用 Pipeline

要在索引文档之前使用定义的 Pipeline,只需在索引或批量请求的 URL 中添加 ?pipeline=<pipeline_id> 参数。

例如,使用之前定义的 firstpipeline 来索引一个文档:

PUT my_index/_doc/1?pipeline=firstpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

执行上述请求后,索引到 my_index 中的文档将具有大写形式的 message 字段。

3. 获取 Pipeline 信息

使用 GET 请求和 _ingest/pipeline 端点可以检索现有 Pipeline 的定义。

例如,要获取所有 Pipeline 的定义:

GET _ingest/pipeline

或者,要获取特定 Pipeline(如 secondpipeline)的定义:

GET _ingest/pipeline/secondpipeline

4. 删除 Pipeline

使用 DELETE 请求和 _ingest/pipeline/<pipeline_id> 端点可以删除一个 Pipeline。

例如,删除名为 firstpipeline 的 Pipeline:

DELETE _ingest/pipeline/firstpipeline


5. 模拟 Pipeline

使用 _simulate 端点可以模拟 Pipeline 的执行,而不实际索引文档。这对于测试 Pipeline 定义和查看预期结果非常有用。

例如,模拟 secondpipeline 对提供的文档集的执行:

POST _ingest/pipeline/secondpipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "name": "pipeline",
        "message": "this is so cool!"
      }
    },
    {
      "_source": {
        "name": "nice",
        "message": "this is nice!"
      }
    }
  ]
}

上述请求将返回模拟执行后的文档,并显示每个文档经过 Pipeline 处理后的结果。

6. 引用其他 Pipeline

在 Pipeline 的定义中,还可以引用其他已存在的 Pipeline。这允许用户创建复杂的文档处理流程,通过组合多个 Pipeline 来实现。

例如,先定义一个 pipelineA,然后在 pipelineB 中引用它:

PUT _ingest/pipeline/pipelineA
{
  "description": "内部 Pipeline",
  "processors": [
    {
      "set": {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}

PUT _ingest/pipeline/pipelineB
{
  "description": "外部 Pipeline",
  "processors": [
    {
      "pipeline": {
        "name": "pipelineA"
      }
    },
    {
      "set": {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}

在上述示例中,当使用 pipelineB 索引文档时,首先会执行 pipelineA 的处理器,然后再执行 pipelineB 中定义的其他处理器。

三、Pipeline API应用场景

Pipeline API在数据预处理方面有着广泛的应用。以下是一些具体的应用场景:

  1. 数据清洗:通过Pipeline API,可以在数据索引到Elasticsearch之前对数据进行清洗,去除无用的字段、转换数据类型、处理缺失值等。这有助于确保数据的准确性和一致性。
  2. 日志处理:对于日志数据,Pipeline API非常有用。它可以用于解析和格式化日志数据,提取出有用的字段进行索引,以便于后续的查询和分析。例如,可以使用Grok处理器来解析复杂的日志行。
  3. 数据增强:除了基本的数据清洗和转换外,Pipeline API还可以用于数据增强。例如,通过Ingest Geo-IP插件,可以根据IP地址提取出地理位置信息并添加到文档中;通过Ingest User-Agent插件,可以解析用户代理字符串并提取出浏览器、操作系统等信息。
  4. 动态修改Pipeline:由于Pipeline API支持编程方式修改,因此可以根据实际需求动态地修改Pipeline。这意味着当数据格式或处理需求发生变化时,无需修改源代码或重启Elasticsearch集群,只需通过API调用即可更新Pipeline。

四、Pipeline 应用方式

  1. 在 Bulk API 中使用
    使用 Bulk API 时,可以指定 pipeline 来预处理批量文档。例如:
POST _bulk
{"index": {"_index": "my_index", "_id" : "1", "pipeline": "my_pipeline"}}
{"name": "zhang san", "category": "sports"}
  1. 对于 Bulk API 请求,可以包含多个操作(如 index, update, delete 等),并为每个操作指定不同的 pipeline(如果需要)。
  2. 在 Beats 中使用

在 Filebeat 或其他 Beats 中,可以通过配置 pipeline processor 来预处理事件数据。这允许在数据发送到 Elasticsearch 之前进行必要的转换和增强。具体可参阅 Elastic 官方文档中关于 Beats 和 pipeline processor 的部分。

在 Reindex API 中使用

当从一个索引重新索引到另一个索引时,可以使用 pipeline 来预处理数据。例如:

POST _reindex
{
  "source": {
    "index": "source_index"
  },
  "dest": {
    "index": "destination_index",
    "pipeline": "some_ingest_pipeline"
  }
}
  1. 这样,从 source_index 重新索引到 destination_index 的所有文档都将通过 some_ingest_pipeline 进行预处理。
  2. 在 Enrich Processors 中使用

Elasticsearch 的 enrich processor 允许你根据其他索引中的数据进行数据丰富。结合 ingest pipeline,可以在数据丰富之前对文档进行预处理。例如,可以在 enrich processor 之前使用 pipeline 来提取或转换字段,以确保它们可用于 enrich processor。

在 Update By Query API 中使用

使用 Update By Query API 更新索引中的文档时,可以通过指定 pipeline 来预处理这些文档。例如:

POST my_index/_update_by_query?pipeline=my_pipeline
{
  "query": {
    "match": {
      "some_field": "some_value"
    }
  }
}
  1. 上述请求将更新 my_index 中满足 some_field: some_value 条件的文档,并在更新前通过 my_pipeline 对它们进行预处理。
  2. 在索引中设置 Default Pipeline

对于特定索引,可以通过设置默认 pipeline 来确保所有新索引的文档都经过该 pipeline 的处理。例如:

PUT my_index
{
  "settings": {
    "index.default_pipeline": "my_pipeline"
  }
}
  1. 此后,任何索引到 my_index 的新文档都将默认通过 my_pipeline 进行预处理。注意,在较新版本的 Elasticsearch 中,设置方式可能有所变化,请查阅相应版本的官方文档。

五、内置 Processors

默认情况下,Elasticsearch 提供大量的ingest处理器。 可以在地址https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 找到已经为我设计好的内置的 processors。下面是一些常见的一些 processor :


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
8月前
|
存储 搜索推荐 数据挖掘
|
8月前
|
自然语言处理 API 索引
Elasticsearch Analyzer原理分析并实现中文分词
Elasticsearch Analyzer原理分析并实现中文分词
129 0
|
6月前
|
存储 数据采集 数据处理
数据处理神器Elasticsearch_Pipeline:原理、配置与实战指南
数据处理神器Elasticsearch_Pipeline:原理、配置与实战指南
225 12
|
7月前
|
存储 缓存 负载均衡
elasticsearch写入流程和请求检索流程原理全方位解析
elasticsearch写入流程和请求检索流程原理全方位解析
|
7月前
|
存储 监控 固态存储
elasticsearch索引生命周期管理(ILM):原理和实践
elasticsearch索引生命周期管理(ILM):原理和实践
|
7月前
|
缓存 自然语言处理 监控
elasticsearch过滤器filter:原理及使用
elasticsearch过滤器filter:原理及使用
|
7月前
|
存储 数据库 开发者
Elasticsearch中的三种分页策略深度解析:原理、使用及对比
Elasticsearch中的三种分页策略深度解析:原理、使用及对比
|
7月前
|
缓存 监控 安全
深入解析Elasticsearch中脚本原理
深入解析Elasticsearch中脚本原理
|
7月前
|
缓存 监控 Java
深入Elasticsearch:线程池的原理与应用
深入Elasticsearch:线程池的原理与应用
|
存储 自然语言处理 负载均衡