Elasticsearch Pipeline 详解

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasticsearch Pipeline 详解

说明

本文是建立在有一些 Elasticsearch 基础和了解相关 Pipeline 概念的人

Ingest Node

简介 Ingest Node

Ingest Node(预处理节点)是 ES 用于功能上命名的一种节点类型,可以通过在 elasticsearch.xml 进行如下配置来标识出集群中的某个节点是否是 Ingest Node.

node.ingest: false

上述将 node.ingest 设置成 false,则表明当前节点不是 Ingest Node,不具有预处理能力,当然 Elasticsearch 默认所有节点都是 Ingest Node,即集群中所有的节点都具有预处理能力.

何为预处理呢?

用过 Logstash 对日志进行处理的用户都知道,一般情况下我们并不会直接将原始的日志进行加载到 Elasticsearch 集群,而是对原始日志信息进行(深)加工后保存到 Elasticsearch 集群中.比如 Logstash 支持多种解析器比如 json,kv,date 等,比较经典的是 grok.这里我们不会对 Logstash 的解析器进行详细说明,只是为了描述一个问题,有些时候我们需要 Logstash 来对加载到 Elasticsearch 中的数据进行处理,这个处理,从概念上而言,我们也能称之为"预处理.而这里我们所说的预处理也其实就是类似的概念.

可以这么说,在 Elasticsearch 没有提供 IngestNode 这一概念时,我们想对存储在 Elasticsearch 里的数据在存储之前进行加工处理的话,我们只能依赖 Logstash 或自定义插件来完成这一功能,但是在 Elasticsearch 5.x 版本中,官方在内部集成了部分 Logstash 的功能,这就是 Ingest,而具有 Ingest 能力的节点称之为 Ingest Node.

请查看 Filter plugins 来了解更多关于 Logstash 解析器的内容.

如果要脱离 Logstash 来对在 Elasticsearch 写入文档之前对文档进行加工处理,比如为文档某个字段设置默认值,重命名某个字段,设置通过脚本来完成更加复杂的加工逻辑,我们则必须要了解两个基本概念: Pipeline 和 Processors.

参考: Ingest Node 来了解更多关于上述两个概念.下文只是简单说明两者.

简介 Pipeline、Processors

管道(Pipeline)是众所周知的一个概念,Elasticsearch 引入这一概念,是为了让那些有过工作经验的人来说更直白,更轻松的理解这一概念.本人是主要用 Java 进行开发,这里就以 Pipeline 和 java 中的 Stream 进行类比,两者从功能和概念上很类似,我们经常会对 Stream 中的数据进行处理,比如 map 操作,peek 操作,reduce 操作,count 操作等,这些操作从行为上说,就是对数据的加工,而 Pipeline 也是如此,Pipeline 也会对通过该 Pipeline 的数据(一般来说是文档)进行加工,比如上面说到的,修改文档的某个字段值,修改文档某个字段的类型等等.而 Elasticsearch 对该加工行为进行抽象包装,并称之为 Processors.Elasticsearch 命名了多种类型的 Processors 来规范对文档的操作,比如 set,append,date,join,json,kv 等等.这些不同类型的 Processors,我们会在后文进行说明

Pipeline 定义

定义一个 Pipeline 是件很简单的事情,官方给出了参考:

PUT _ingest/pipeline/my-pipeline-id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}

上面的例子,表明通过指定的 URL 请求"_ingest/pipeline"定义了一个 ID 为"my-pipeline-id"的 pipeline,其中请求体中的存在两个必须要的元素:

  • description 描述该 pipeline 是做什么的
  • processors 定义了一系列的 processors,这里只是简单的定义了一个赋值操作,即将字段名为"foo"的字段值都设置为"bar"

如果需要了解更多关于 Pipeline 定义的信息,可以参考: Ingest APIs

简介 Simulate Pipeline API

既然 Elasticsearch 提供了预处理的能力,总不能是黑盒处理吧,为了让开发者更好的了解和使用预处理的方式和原理,官方也提供了相关的接口,来让我们对这些预处理操作进行测试,这些接口,官方称之为: Simulate Pipeline API.想要深入了解 pipeline 以及 processors 的使用,我们基本离不开 Simulate Pipeline API 来辅助帮我们完成许多工作.

下面是一个比较简单的 Simulate Pipeline API 的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    // pipeline definition here
  },
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}

上面的例子中,在请求 URL 中并没有明确指定使用哪个 pipeline,这种情况下需要我们在请求体中即时定义一个,当然我们也可以在请求 URL 中指定一个 pipeline 的 ID,如下面例子:

POST _ingest/pipeline/my-pipeline-id/_simulate
{
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}

这样一来,在请求体中我们定义的"docs"中的内容就能够使用该 pipeline 和该 pipeline 种的 processors,下面是一个比较具体的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

在上面具体的例子中,我们并没有在请求 URL 中指定使用哪个 pipeline,因此我们不得不在请求体中即时定义一个 pipeline 和对应的 processors,例子很简单,只是让通过该 pipeline 的的文档中的字段名称为"field2"的字段值为"_value",文档我们也指定了,就是"docs"中定义的文档,该请求的响应信息如下:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "type",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "type",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

从具体的响应结果中看到,在文档通过 pipeline 时(或理解为被 pipeline 中的 processors 加工后),新的文档与原有的文档产生了差异,这些差异体现为:

  1. 文档都新增了 field2 字段,这点我们可以通过对比响应前后的定义在"docs"中文档的_source 内容中看出
  2. 额外增加了一些临时(官方称之为瞬态)的字段,比如 timestamp,他们都在_ingest 节点下,(这些)字段都是临时与源文档存在一起,在被 pipeline 中的 processors 加工后后返回给对应的批量操作或索引操作之后,这些信息就不会携带返回.

访问 Pipeline 中的内容

如果我们只是定义一个 Pipeline 而不能访问该 Pipeline 的上下文信息,那么设计 Pipeline 就显然是多此一举.与 logstash 不同的时候,logstash 环境与 Elasticsearch 环境是隔离的,两者无法形成上下文环境,而 Pipeline 则不同,天然的集成在 Elasticsearch 中,从而很好的与其形成上线文环境,这个环境让 Pipeline 能够更充分的利用起来,比如与 Pipeline 能够形成最直接的上线文关系的是文档信息,由此可见,我们可以在 Pipeline(应该是 processors 中)能够直接访问通过 pipeline 的文档信息,如文档的字段,元数据信息.

下面的例子说明了我们如何在 processors 中访问这些上下文信息.

访问源文档字段

{
  "set": {
    "field": "my_field"
    "value": 582.1
  }
}

这个例子直接引用了通过该 Pipeline 内的文档字为"my_field"的字段,并将所有文档该字段的值设置为 582.1.

或者我们也可以通过_source 字段来访问源文档的字段,如下

{
  "set": {
    "field": "_source.my_field"
    "value": 582.1
  }
}

访问文档的元数据字段

每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{
  "set": {
    "field": "_id"
    "value": "1"
  }
}

直接访问文档的_id 字段,并设置该值为 1

访问瞬态元字段(ingest metadata field)

这个地方并没有直接翻译官方名称,因为不好理解,我使用了我自己的理解意图去翻译该名称.其实含义是一致的,官方称之为ingest metadata field,翻译过来且为"摄取元数据字段?"(请高人翻译,我实在无法很好的翻译出来),从官方对该词的释义中可以理解出来,这些字段是临时保存的,在这些文档被处理完成之后返回给对应的批量请求和索引请求的时候,这些字段不会一并返回,因此这里称之为"瞬态元字段",下面是一个如何访问这些字段的例子:

{
  "set": {
    "field": "received"
    "value": "{ {_ingest.timestamp}}"
  }
}

该例子会在文档中临时加入一个名称为"received"的字段,值为瞬态元字段中的 timestamp,以{ {_ingest.timestamp}}的格式来进行访问,看到{ {}}是不是有种很熟悉的感觉,因为只有在涉及上下文的环境中一般才会使用这种表达式.

Processors 类型详解

  • Append Processor 追加处理器
  • Convert Processor 转换处理器
  • Date Processor 日期转换器
  • Date Index Name Processor 日期索引名称处理器
  • Fail Processor 失败处理器
  • Foreach Processor 循环处理器
  • Grok Processor Grok 处理器
  • Gsub Processor
  • Join Processor
  • JSON Processor
  • KV Processor
  • Lowercase Processor
  • Remove Processor
  • Rename Processor
  • Script Processor
  • Split Processor
  • Sort Processor
  • Trim Processor
  • Uppercase Processor
  • Dot Expander Processor

Append Processor

顾名思义,追加处理器.就是当我们加载原始文档到 Elasticsearch 的时候,某些字段的描述信息可能需要我们定制性的增加一些额外说明.

使用方法如下:

{
  "append": {
    "field": "field1",
    "value": ["item2", "item3", "item4"]
  }
}

比如我们需要从第三方新导入一批商品,因为某种原因这批新商品必须要打上一个"家居"的标签来表示该批商品是"家居分类",当然一个商品也可能不止一个分类,这个时候要求我们在导入这批文档数据的时候,需要在原有的标签字段中新增一个"家居"标签,这个时候我们就通过追加处理器来完成.

下面的例子我们通过 Simulate Pipeline API 来帮助我们完成

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "新增一个家居标签",
    "processors": [
      {
        "append": {
          "field": "tag",
          "value": [
            "家居"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000001",
        "tag": "衣柜"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000002",
        "tag": "沙发"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000003"
      }
    }
  ]
}

返回结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000001",
          "tag": [
            "衣柜",
            "家居"
          ]
        },
        "_ingest": {
          "timestamp": "2017-11-24T13:19:55.555Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000002",
          "tag": [
            "沙发",
            "家居"
          ]
        },
        "_ingest": {
          "timestamp": "2017-11-24T13:19:55.555Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000003",
          "tag": [
            "家居"
          ]
        },
        "_ingest": {
          "timestamp": "2017-11-24T13:19:55.555Z"
        }
      }
    }
  ]
}

我们可以看到,通过该处理器处理之后,新的文档中的 tag 字段都新增了一个"家居"标签,同时如果原文档没有该字段,则会新建该字段.

Convert Processor

该类型的 Processor 是用于在写入某些文档之前对该文档的字段类型进行转换,比如文档中有个字符串类型的"价格"字段,我们希望将其转换成 float 类型,则我们可以使用该处理器来完成这项操作,实现如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "修改字符串类型为double类型",
    "processors": [
      {
       "convert": {
         "field": "price",
         "type": "float"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000001",
        "tag": "衣柜",
        "price":"999.8"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000002",
        "tag": "沙发",
        "price":"899.8"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000003",
        "price":"799.8"
      }
    }
  ]
}

测试结果如下:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000001",
          "tag": "衣柜",
          "price": 999.8
        },
        "_ingest": {
          "timestamp": "2017-11-26T10:56:47.663Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000002",
          "tag": "沙发",
          "price": 899.8
        },
        "_ingest": {
          "timestamp": "2017-11-26T10:56:47.664Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "code": "000003",
          "price": 799.8
        },
        "_ingest": {
          "timestamp": "2017-11-26T10:56:47.664Z"
        }
      }
    }
  ]
}

可以看到,新的文档,price 字段将不再是字符串类型了,而是 float 类型.

使用 Convert Processor 需要注意的是,目前官方支持转换的类型有如下几个:integer, float, string, boolean, and auto。

关于更多该处理器的使用说明,请参考: https://www.elastic.co/guide/en/elasticsearch/reference/current/convert-processor.html

Date Processor

日期处理器,顾名思义,就是将原文档中的某个日期字段转换成一个 Elasticsearch 识别的时间戳字段(一般默认为@timestamp),该时间戳字段将会新增到原文档中.(目前发现该功能比较鸡肋,因为转换后的时间格式只能是 ISO 8601 时间格式的,并不能转换成其他格式,因此很少用)

下面是一个使用 Date Processor 的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "转换成指定格式的日期格式字段",
    "processors": [
      {
      "date": {
        "field": "date",
        "formats": ["UNIX"],
        "timezone": "Asia/Shanghai"
      }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "code":"000001",
        "tag": "衣柜",
        "price":"999.8",
        "date":"1511696379"
      }
    }
  ]
}

就是将原有的字符串表示的日期格式进行格式转换(最终转换成的是 ISO 时间格式),结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "date": "1511696379",
          "code": "000001",
          "@timestamp": "2017-11-26T19:39:39.000+08:00",
          "price": "999.8",
          "tag": "衣柜"
        },
        "_ingest": {
          "timestamp": "2017-11-26T12:01:16.787Z"
        }
      }
    }
  ]
}

我们可以看到多了一个默认字段@timestamp,该字段名称可以通过 target_field 字段来指定该字段名称.

为什么说只能生成 ISO 格式的时间呢?这方面的资料官方没有提供,只能自己去源码中看,这里

public void execute(IngestDocument ingestDocument) {
        String value = ingestDocument.getFieldValue(field, String.class);

        DateTime dateTime = null;
        Exception lastException = null;
        for (Function<String, DateTime> dateParser : dateParsers) {
            try {
                dateTime = dateParser.apply(value);
            } catch (Exception e) {
                //try the next parser and keep track of the exceptions
                lastException = ExceptionsHelper.useOrSuppress(lastException, e);
            }
        }

        if (dateTime == null) {
            throw new IllegalArgumentException("unable to parse date [" + value + "]", lastException);
        }

        ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime));
    }

其中最后一行,在设置字段的时候,发现源码中已经写死为 ISODateTimeFormat.个人感觉官方应该会自定义 pattern.

Date Index Name Processor

Date Index Name Processor 算得上是一个比较强大的处理了,它的目的是让通过该处理器的文档能够分配到符合指定时间格式的索引中,前提是按照官方提供的说明来进行使用:

我们先创建一个该类型的 pipeline,如下:

curl -XPUT 'localhost:9200/_ingest/pipeline/monthlyindex?pretty' -H 'Content-Type: application/json' -d'
{
  "description": "monthly date-time index naming",
  "processors" : [
    {
      "date_index_name" : {
        "field" : "date1",
        "index_name_prefix" : "myindex-",
        "date_rounding" : "M"
      }
    }
  ]
}
'

我们再创建一个文档,如下(该文档会使用该 pipeline)

curl -XPUT 'localhost:9200/myindex/type/1?pipeline=monthlyindex&pretty' -H 'Content-Type: application/json' -d'
{
  "date1" : "2016-04-25T12:02:01.789Z"
}
'

结果如下:

{
  "_index" : "myindex-2016-04-01",
  "_type" : "type",
  "_id" : "1",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "created" : true
}

以上请求将不会将此 document(文档)放入 myindex 索引,而是放入到 myindex-2016-04-01 索引中.

这就是日期索引名称处理器(Date Index Name Processor)强大的地方,我们写入的文档可以根据其中的某个日期格式的字段来指定该文档将写入哪个索引中,该功能配上 template,能够实现很强大的日志收集功能,比如按月按天来将日志写入 Elasticsearch.

如果想了解 Date Index Name Processor 的使用,请参考:Date Index Name Processor

Fail Processor

该处理器比较简单,就是当文档通过该 pipeline 的时候,一旦出现异常,该 pipeline 指定的错误信息就会返回给请求者.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "monthly date-time index naming",
  "processors" : [
    {
     "fail": {
       "message": "an error message"
     }
    }
  ]
  }, 
  "docs": [
    {
      "_index": "myindex",
      "_type": "type",
      "_id": "id",
      "_source": {
        "id":"1"
      }
    }
  ]
}

结果如下:

{
  "docs": [
    {
      "error": {
        "root_cause": [
          {
            "type": "exception",
            "reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message",
            "header": {
              "processor_type": "fail"
            }
          }
        ],
        "type": "exception",
        "reason": "java.lang.IllegalArgumentException: org.elasticsearch.ingest.common.FailProcessorException: an error message",
        "caused_by": {
          "type": "illegal_argument_exception",
          "reason": "org.elasticsearch.ingest.common.FailProcessorException: an error message",
          "caused_by": {
            "type": "fail_processor_exception",
            "reason": "an error message"
          }
        },
        "header": {
          "processor_type": "fail"
        }
      }
    }
  ]
}

Foreach Processor

一个 Foreach Processor 是用来处理一些数组字段,数组内的每个元素都会使用到一个相同的处理器,比如

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "foreach processor",
  "processors" : [
    {
      "foreach": {
        "field": "values",
        "processor": {
          "uppercase": {
            "field": "_ingest._value"
          }
        }
      }
    }
  ]
  }, 
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "id":"1",
        "values":["hello","world","felayman"]
      }
    }
  ]
}

上面的例子中,文档中的 values 字段是一个数组类型的字段,其中每个元素都需要使用 Foreach Processor 中的一个共同的 uppercase Processor 来保证该字段中的每个元素都能执行相同的操作

结果如下:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "id": "1",
          "values": [
            "HELLO",
            "WORLD",
            "FELAYMAN"
          ]
        },
        "_ingest": {
          "_value": null,
          "timestamp": "2017-11-27T08:55:17.496Z"
        }
      }
    }
  ]
}

关于每个 Processor,深入下去都有许多需要说明的,这里没有深入,只是让大家了解其基本使用。

详情请参考: Foreach Processore

Grok Processor

Grok Processor 可以算的上是一个比较实用的处理器了,会经常使用到日志格式切割上,有用过 logstash 的用户应该都知道 Grok 的强大.这里并不会涉及到到 Grok 详细的语法知识.这里我们只是略过的说明 Elasticsearch 中的ingest node 中的 pipeline 也能提供 logstash 中的 Grok 的功能.

如下的使用例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "grok processor",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]
      }
    }
  ]
  }, 
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
       "message": "55.3.244.1 GET /index.html 15824 0.043"
      }
    }
  ]
}

上面例子中的日志可能是 nginx 的日志,格式为:

55.3.244.1 GET /index.html 15824 0.043

对应的 Grok 语法为:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

因此该文本格式的日志信息在经过 Grok Processor 之后,能够解析成一个标准文档的格式,该文档可以使用 Elasticsearch 提供的检索和聚合功能充分使用到,而原始的文本格式的日志信息则无法做到这一点

返回结果如下:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "duration": "0.043",
          "request": "/index.html",
          "method": "GET",
          "bytes": "15824",
          "client": "55.3.244.1",
          "message": "55.3.244.1 GET /index.html 15824 0.043"
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:07:40.782Z"
        }
      }
    }
  ]
}

可以看到,对应的切割点都转换成文档中的一个字段.

关于 Grok Processor 的详细介绍,请参考:Grok Processor

Gsub Processor

Gsub Processor 能够解决一些字符串中才特有的问题,比如我想把字符串格式的日期格式如"yyyy-MM-dd HH:mm:ss"转换成"yyyy/MM/dd HH:mm:ss"的格式,我们可以借助于 Gsub Processor 来解决,而 Gsub Processor 也正是利用正则来完成这一任务的.

如:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "gsub processor",
  "processors" : [
    {
      "gsub": {
        "field": "message",
        "pattern": "-",
        "replacement": "/"
      }
    }
  ]
  }, 
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
       "message": "2017-11-27 00:00:00"
      }
    }
  ]
}

返回结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "2017/11/27 00:00:00"
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:15:05.502Z"
        }
      }
    }
  ]
}

就将 message 字段中的日期格式修改成我们想要的格式了.

更多关于 Gsub Processor 的介绍,请参考 Gsub Processor

Join Processor

Join Processor 能够将原本一个数组类型的字段值,分解成以指定分隔符分割的一个字符串.

如下例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "join processor",
    "processors": [
      {
        "join": {
          "field": "message",
          "separator": "、、、、"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message": ["hello","world","felayman"]
      }
    }
  ]
}

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "hello、、、、world、、、、felayman"
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:25:41.260Z"
        }
      }
    }
  ]
}

可以看到,原文档中数组格式的的 message 字段,经过 Join Processor 处理后,变成了字符串类型的字段"hello、、、、world、、、、felayman".

更多关于 Join Processor 的内容,请参考:Join Processor

JSON Processor

JSON Processor 也是用来处理字符串类型的字段,可以将那些符合 JSON 格式(或被 JSON 串化)的文本,在经过 JSON Processor 加工之后,解析成对应的 JSON 格式,如下例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "join processor",
    "processors": [
      {
        "json": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"{\"foo\": 2000}"
      }
    }
  ]
}

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": {
            "foo": 2000
          }
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:30:28.546Z"
        }
      }
    }
  ]
}

可以看到,原本是字符串格式的字段 message,在处理之后变成了一个标准的 JSON 格式.

更多关于 JSON Processor 的内容,请参考:JSON Processor

KV Processor

KV Processor 用来 K,V 字符串格式的处理器,比如 K=V, K:V,K->V 等格式的解析.

如下例:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "kv processor",
    "processors": [
      {
        "kv": {
          "field": "message",
          "field_split":" ",
          "value_split": ":"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"ip:127.0.0.1"
      }
    }
  ]
}

其中字段 message 的原始值为""ip:127.0.0.1"",我们想将该格式的内容新增一个独立的字段如 ip:127.0.0.1 这种格式.

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "ip:107.0.0.1",
          "ip": "107.0.0.1"
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:46:48.715Z"
        }
      }
    }
  ]
}

更多关于 KV Processor 的内容,请参考:KV Processor

Lowercase Processor

Lowercase Processor 也是一个专用于字符串类型的字段处理器,顾名思义,是将字符串都转换成小写格式.

如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "lowercase processor",
    "processors": [
      {
        "lowercase": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"HELLO,WORLD,FELAYMAN."
      }
    }
  ]
}

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "hello,world,felayman."
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:49:58.564Z"
        }
      }
    }
  ]
}

可以看到原文档中的 message 字段的值,在使用 Lowercase Processor 之后,都变成大写的了.

更多关于 Lowercase Processor 的内容,请参考:Lowercase Processor

Remove Processor

Remove Processor 是用来处理在写入文档之前,删除原文档中的某些字段值的.

如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "remove processor",
    "processors": [
      {
       "remove": {
         "field": "extra_field"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "extra_field":"extra_field",
        "message":" hello,  felayman."
      }
    }
  ]
}

在经过 Remove Processor 处理后,extra_field 字段将不存在了.

结果为下:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": " hello,  felayman."
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:58:46.038Z"
        }
      }
    }
  ]
}

Rename Processor

Rename Processor 是用来处理在文档写入 Elasticsearch 之前修改某个文档的字段的名称

如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "remove processor",
    "processors": [
      {
       "rename": {
         "field": "old_name",
         "target_field": "new_name"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
       "old_name":"old_name"
      }
    }
  ]
}

会发现原文档的字段 old_name 被重新命名为 new_name 字段

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "new_name": "old_name"
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:00:48.694Z"
        }
      }
    }
  ]
}

Script Processor

Script Processor 算的上是最强大的处理了,因为能充分利用 Elasticsearch 提供的脚本能力.

这里也不会详细介绍 Elasticsearch 中的脚本如何使用,有关信息,请参考:Script

我么看下在 Script Processor 中使用脚本的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "script processor",
    "processors": [
      {
        "script": {
          "lang": "painless",
          "inline": "ctx.viewCount = (ctx.viewCount) *10"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "viewCount": 100
      }
    }
  ]
}

这里我们通过脚本让原文档中的 viewCount 字段的值扩大十倍,结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "viewCount": 1000
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:07:56.112Z"
        }
      }
    }
  ]
}

Set Processor

Set Processor 作用于两种不同情况:

  1. 指定字段存在时,修改指定字段的值
  2. 指定字段不存在时,新增该字段并设置该字段的值

例子如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "set processor",
    "processors": [
      {
       "set": {
         "field": "category",
         "value": "家居"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "id":"0001"
      }
    }
  ]
}

这里我们为每个使用 Set Processor 的文档新增一个分类"家居"

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "id": "0001",
          "category": "家居"
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:12:05.306Z"
        }
      }
    }
  ]
}

Split Processor

Split Processor 用于将一个以指定分隔分开的字符串转换成一个数组类型的字段

如下:

 POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Split processor",
    "processors": [
      {
       "split": {
         "field": "message",
         "separator": "-"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"hello-world"
      }
    }
  ]
}

其中原文档的 message 字段值将会有"hello-world"转换为["hello","world"]

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": [
            "hell",
            "world"
          ]
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:13:35.982Z"
        }
      }
    }
  ]
}

Sort Processor

Sort Processor 用于处理数组类型的字段,可以将存储在原文档中某个数组类型的字段中的元素按照升序或降序来对原元素进行排序

如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "sort processor",
    "processors": [
      {
       "sort": {
         "field": "category",
         "order": "asc"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "category":[2,3,4,1]
      }
    }
  ]
}

我们使用升序来修改原文档 category 字段的值的存储排序

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "category": [
            1,
            2,
            3,
            4
          ]
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:16:16.409Z"
        }
      }
    }
  ]
}

Trim Processor

哎,Elasticsearch 开发者真的没谁了,基本上能把 String 类中的方法都拿过来搬一套过来使用.

Trim Processor 是专门用于处理字符串两端的空格问题,如下

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "trim processor",
    "processors": [
      {
       "trim": {
         "field": "message"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":" hello,  felayman."
      }
    }
  ]
}

请注意,是去除字符串两端的空格.

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "hello,  felayman."
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:56:17.946Z"
        }
      }
    }
  ]
}

Uppercase Processor

该处理器类似于 Lowercase Processor,将字符串文本统一转换成大写.

如下:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "uppercase processor",
    "processors": [
      {
       "uppercase": {
         "field": "message"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"hello,world,felayman."
      }
    }
  ]
}

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
          "message": "HELLO,WORLD,FELAYMAN."
        },
        "_ingest": {
          "timestamp": "2017-11-27T09:53:33.672Z"
        }
      }
    }
  ]
}

更多关于 Uppercase Processor 的内容,请参考:Uppercase Processor

Dot Expander Processor

自定义 Processor

如果发现官方提供的 Processor 不满足我们的加工逻辑怎么办?不用担心,官方提供了很好的插件机制来帮助我们对其进行扩展,想实现一个自定义的 Processor 需要两个过程:

  • 实现 IngestPlugin 接口,并实现 IngestPlugin 中的 getProcessors 方法
  • 实现自定义的 Processor

下面我们给一个具体的例子,我们将任何传入的一个字段的值都变成大写(最简化模型,不指定字段,使用_ingest/pipeline/_simulate api 模拟该操作)

新建一个项目,名称为 elasticsearch-help,新建 FirstUpperPlugin 类,如下:

package org.elasticsearch.help;
import org.elasticsearch.help.processor.FirstUpperProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.Collections;
import java.util.Map;

/**
 * @auhthor lei.fang@shijue.me
 * @since . 2017-11-26
 */
public class FirstUpperPlugin extends Plugin implements IngestPlugin{
   

    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
   
        return Collections.singletonMap(FirstUpperProcessor.TYPE,new FirstUpperProcessor.Factory());
    }
}

其中我们实现了 getProcessors 方法,该方法主要用来提供我们自定义的额 Processor,其中 FirstUpperProcessor 源码如下:

package org.elasticsearch.help.processor;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.Map;

/**
 * @auhthor lei.fang@shijue.me
 * @since . 2017-11-26
 */
public class FirstUpperProcessor extends AbstractProcessor {
   

    public static final String TYPE = "firstUpper";

    private final String field;

    public FirstUpperProcessor(String tag,String field) {
   
        super(tag);
        this.field = field;
    }

    @Override
    public void execute(IngestDocument ingestDocument) throws Exception {
   
        ingestDocument.setFieldValue(field,field.toUpperCase());
    }

    @Override
    public String getType() {
   
        return TYPE;
    }

    public static final class Factory implements Processor.Factory {
   

        public FirstUpperProcessor create(Map<String, Processor.Factory> registry, String processorTag,
                                    Map<String, Object> config) throws Exception {
   
            String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
            return new FirstUpperProcessor(processorTag, field);
        }
    }

}

可以看到,其中核心的方法为 execute,主要是将传入的字段的值转换成大写,这里只是调用了简单的 toUpperCase()方法.

我们使用_ingest/pipeline/_simulate api 来进行测试:

curl -XGET 'http://localhost:9200/_ingest/pipeline/_simulate?pretty ' -d '{
  "pipeline": {
    "description": "字符串首字母转换成大写",
    "processors": [
      {
       "firstUpper":{
         "field":"message"
       }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "id",
      "_source": {
        "message":"hello,world."
      }
    }
  ]
}'

结果为:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "type",
        "_id": "id",
        "_source": {
            "message":"Hello,world."
        },
        "_ingest": {
          "timestamp": "2017-11-27T10:16:16.409Z"
        }
      }
    }
  ]
}

可以看到,自定义的 Ingest node 插件成功了.

完结.

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
10月前
|
存储 机器学习/深度学习 自然语言处理
大数据数据存储的搜索引擎Elasticsearch的基本操作(含API使用)的基本聚合的Pipeline聚合
大数据数据存储的搜索引擎Elasticsearch是一种常用的全文搜索引擎,支持对文本数据的实时搜索和分析。
73 0
|
消息中间件 分布式计算 大数据
Elasticsearch生态&技术峰会 | Elasticsearch基于Pipeline窗口函数实现实时聚合计算
开源最大的特征就是开放性,云生态则让开源技术更具开放性与创造性,Elastic 与阿里云的合作正是开源与云生态共生共荣的典范。值此合作三周年之际,我们邀请业界资深人士相聚云端,共话云上Elasticsearch生态与技术的未来。
1099 0
Elasticsearch生态&技术峰会 | Elasticsearch基于Pipeline窗口函数实现实时聚合计算
|
7天前
|
数据可视化 索引
elasticsearch head、kibana 安装和使用
elasticsearch head、kibana 安装和使用
|
19天前
|
存储 负载均衡 索引
linux7安装elasticsearch-7.4.0集群配置
linux7安装elasticsearch-7.4.0集群配置
108 0
|
2月前
|
存储 监控 搜索推荐
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
|
4月前
ElasticSearch-Head浏览器插件离线安装
ElasticSearch-Head浏览器插件离线安装
90 0
|
6天前
|
JSON Unix Linux
Elasticsearch如何安装
Elasticsearch如何安装
|
3月前
|
前端开发 安全 Ubuntu
Elasticsearch安装和配置
Elasticsearch安装和配置
116 0
|
4月前
|
数据可视化 Docker 容器
elasticsearch 安装(一)
elasticsearch 安装
160 0
|
1月前
|
监控 安全 Java
ElasticSearch在Windows上的下载与安装
ElasticSearch在Windows上的下载与安装

相关产品

  • 检索分析服务 Elasticsearch版