ES Pipeline Aggregation(管道聚合)

简介: ES Pipeline Aggregation(管道聚合)

管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。


注:关于脚本聚合目前在本文中暂时不会涉及。


主要有如下两种管道聚合方式:


  • parent
  • sibling


下面一一介绍ES定义的管道聚合。


image.png

同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。


示例如下:

1{
2    "avg_bucket": {
3        "buckets_path": "the_sum"  // @1
4    }
5}
  • buckets_path:指定聚合的名称,支持多级嵌套聚合。
    其他参数:
  • gap_policy
    当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
  • insert_zeros:默认使用0代替。
  • format
    用于格式化聚合桶的输出(key)。


示例如下:

1POST /_search
 2{
 3  "size": 0,
 4  "aggs": {
 5    "sales_per_month": {                  // @1
 6           "date_histogram": {
 7            "field": "date",
 8            "interval": "month"
 9      },
10      "aggs": {                                    // @2
11        "sales": {
12          "sum": {
13            "field": "price"
14          }
15        }
16      }
17    },
18    "avg_monthly_sales": {             // @3
19      "avg_bucket": {
20        "buckets_path": "sales_per_month>sales" 
21      }
22    }
23  }
24}

代码@1:首先定义第一级聚合(按月)直方图聚合。


代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。


代码@3:对上面的聚合求平均值。


其返回结果如下:

1{
 2    ... // 省略
 3   "aggregations": {
 4      "sales_per_month": {
 5         "buckets": [
 6            {
 7               "key_as_string": "2015/01/01 00:00:00",
 8               "key": 1420070400000,
 9               "doc_count": 3,
10               "sales": {
11                  "value": 550.0
12               }
13            },
14            {
15               "key_as_string": "2015/02/01 00:00:00",
16               "key": 1422748800000,
17               "doc_count": 2,
18               "sales": {
19                  "value": 60.0
20               }
21            }
22         ]
23      },
24      "avg_monthly_sales": {   // 这是对二级聚合的结果再进行一次求平均值聚合。
25          "value": 328.33333333333333
26      }
27   }
28}

对应的JAVA示例如下:

1public static void test_pipeline_avg_buncket_aggregation() {
 2        RestHighLevelClient client = EsClient.getClient();
 3        try {
 4            SearchRequest searchRequest = new SearchRequest();
 5            searchRequest.indices("aggregations_index02");
 6            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
 7            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
 8                                                        .field("sellerId")
 9                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
10                                                                            .field("num")
11                                                        )
12                                                  ;
13            sourceBuilder.aggregation(aggregationBuild);
14
15            // 添加 avg bucket pipeline
16            sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
17            sourceBuilder.size(0);
18
19            searchRequest.source(sourceBuilder);
20            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
21            System.out.println(result);
22        } catch (Throwable e) {
23            e.printStackTrace();
24        } finally {
25            EsClient.close(client);
26        }
27    }


image.png

同级管道聚合,百分位管道聚合。其JAVA示例如下:


1public static void test_Percentiles_buncket_aggregation() {
 2        RestHighLevelClient client = EsClient.getClient();
 3        try {
 4            SearchRequest searchRequest = new SearchRequest();
 5            searchRequest.indices("aggregations_index02");
 6            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
 7            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
 8                                                        .field("sellerId")
 9                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
10                                                                            .field("num")
11                                                        )
12                                                  ;
13            sourceBuilder.aggregation(aggregationBuild);
14
15            // 添加 avg bucket pipeline
16            sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
17            sourceBuilder.size(0);
18
19            searchRequest.source(sourceBuilder);
20            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
21            System.out.println(result);
22        } catch (Throwable e) {
23            e.printStackTrace();
24        } finally {
25            EsClient.close(client);
26        }
27    }

其返回值如下:

1{
 2  ...  // 省略其他属性
 3    "aggregations":{
 4        "lterms#seller_agg":{
 5            "doc_count_error_upper_bound":0,
 6            "sum_other_doc_count":12,
 7            "buckets":[
 8                {
 9                    "key":45,
10                    "doc_count":567,
11                    "sum#seller_num_agg":{
12                        "value":911
13                    }
14                },
15                {
16                    "key":31,
17                    "doc_count":324,
18                    "sum#seller_num_agg":{
19                        "value":353
20                    }
21                } // 省略其他桶的显示
22            ]
23        },
24        "percentiles_bucket#seller_num_agg_av":{
25            "values":{
26                "1.0":5,
27                "5.0":5,
28                "25.0":10,
29                "50.0":20,
30                "75.0":290,
31                "95.0":911,
32                "99.0":911
33            }
34        }
35    }
36}


image.png

累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。


其语法(restfull)如下:

1{
2    "cumulative_sum": {
3        "buckets_path": "the_sum"
4    }
5}

支持的参数说明:


  • buckets_path
    桶聚合名称,作为管道聚合的输入信息。
  • format
    格式化key。


使用示例如下:


1POST /sales/_search
 2{
 3    "size": 0,
 4    "aggs" : {
 5        "sales_per_month" : {
 6            "date_histogram" : {
 7                "field" : "date",
 8                "interval" : "month"
 9            },
10            "aggs": {
11                "sales": {
12                    "sum": {
13                        "field": "price"
14                    }
15                },
16                "cumulative_sales": {
17                    "cumulative_sum": {
18                        "buckets_path": "sales" 
19                    }
20                }
21            }
22        }
23    }
24}

其返回结果如下:

1{
 2   "took": 11,
 3   "timed_out": false,
 4   "_shards": ...,
 5   "hits": ...,
 6   "aggregations": {
 7      "sales_per_month": {
 8         "buckets": [
 9            {
10               "key_as_string": "2015/01/01 00:00:00",
11               "key": 1420070400000,
12               "doc_count": 3,
13               "sales": {
14                  "value": 550.0
15               },
16               "cumulative_sales": {
17                  "value": 550.0
18               }
19            },
20            {
21               "key_as_string": "2015/02/01 00:00:00",
22               "key": 1422748800000,
23               "doc_count": 2,
24               "sales": {
25                  "value": 60.0
26               },
27               "cumulative_sales": {
28                  "value": 610.0
29               }
30            },
31            {
32               "key_as_string": "2015/03/01 00:00:00",
33               "key": 1425168000000,
34               "doc_count": 2,
35               "sales": {
36                  "value": 375.0
37               },
38               "cumulative_sales": {
39                  "value": 985.0
40               }
41            }
42         ]
43      }
44   }
45}

从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。

对应的JAVA示例如下:

1{
 2    "aggregations":{
 3        "date_histogram#createTime_histogram":{
 4            "buckets":{
 5                "2015-12-01 00:00:00":{
 6                    "key_as_string":"2015-12-01 00:00:00",
 7                    "key":1448928000000,
 8                    "doc_count":6,
 9                    "sum#seller_num_agg":{
10                        "value":16
11                    },
12                    "simple_value#Cumulative_Seller_num_agg":{
13                        "value":16
14                    }
15                },
16                "2016-01-01 00:00:00":{
17                    "key_as_string":"2016-03-01 00:00:00",
18                    "key":1456790400000,
19                    "doc_count":10,
20                    "sum#seller_num_agg":{
21                        "value":11
22                    },
23                    "simple_value#Cumulative_Seller_num_agg":{
24                        "value":31
25                    }
26                }
27                // ... 忽略
28            }
29        }
30    }
31}


image.png

一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。


使用语法如下:

1{
 2    "bucket_sort": {
 3        "sort": [
 4            {"sort_field_1": {"order": "asc"}},
 5            {"sort_field_2": {"order": "desc"}},
 6            "sort_field_3"
 7        ],
 8        "from": 1,
 9        "size": 3
10    }
11}

支持的参数说明如下:


  • sort
    定义排序结构。
  • from
    用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。
  • size
    返回的桶数。默认为父聚合的所有桶。
  • gap_policy
    当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
  • insert_zeros:默认使用0代替。


官方示例如下:


1POST /sales/_search
 2{
 3    "size": 0,
 4    "aggs" : {
 5        "sales_per_month" : {
 6            "date_histogram" : {
 7                "field" : "date",
 8                "interval" : "month"
 9            },
10            "aggs": {
11                "total_sales": {
12                    "sum": {
13                        "field": "price"
14                    }
15                },
16                "sales_bucket_sort": {
17                    "bucket_sort": {
18                        "sort": [
19                          {"total_sales": {"order": "desc"}}
20                        ],
21                        "size": 3
22                    }
23                }
24            }
25        }
26    }
27}

对应的JAVA示例如下:

1public static void test_bucket_sort_Aggregation() {
 2        RestHighLevelClient client = EsClient.getClient();
 3        try {
 4
 5            //构建日期直方图聚合  时间间隔,示例中按月统计
 6            DateHistogramInterval interval = new DateHistogramInterval("1M"); 
 7            SearchRequest searchRequest = new SearchRequest();
 8            searchRequest.indices("aggregations_index02");
 9            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
10            AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")
11                                                                        .field("createTime")
12                                                                        .dateHistogramInterval(interval)
13                                                                        .keyed(true)
14                                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
15                                                                                .field("num")
16                                                                        )
17                                                                        .subAggregation(new  BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList(
18                                                                                new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC)))
19                                                                                .from(0)
20                                                                                .size(3))
21                                                                        //  BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts)
22                                                                        .subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg"))
23                                                                    //  .format("yyyy-MM-dd") // 对key的格式化
24                                                  ;
25            sourceBuilder.aggregation(aggregationBuild);
26            sourceBuilder.size(0);
27            sourceBuilder.query(
28                    QueryBuilders.termQuery("sellerId", 24)
29            );
30            searchRequest.source(sourceBuilder);
31            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
32            System.out.println(result);
33        } catch (Throwable e) {
34            e.printStackTrace();
35        } finally {
36            EsClient.close(client);
37        }
38    }

返回值:

1{
 2    "aggregations":{
 3        "date_histogram#createTime_histogram":{
 4            "buckets":{
 5                "2016-04-01 00:00:00":{
 6                    "key_as_string":"2016-04-01 00:00:00",
 7                    "key":1459468800000,
 8                    "doc_count":2,
 9                    "sum#seller_num_agg":{
10                        "value":2
11                    },
12                    "simple_value#Cumulative_Seller_num_agg":{
13                        "value":2
14                    }
15                },
16                "2017-05-01 00:00:00":{
17                    "key_as_string":"2017-05-01 00:00:00",
18                    "key":1493596800000,
19                    "doc_count":3,
20                    "sum#seller_num_agg":{
21                        "value":3
22                    },
23                    "simple_value#Cumulative_Seller_num_agg":{
24                        "value":5
25                    }
26                },
27                "2017-02-01 00:00:00":{
28                    "key_as_string":"2017-02-01 00:00:00",
29                    "key":1485907200000,
30                    "doc_count":4,
31                    "sum#seller_num_agg":{
32                        "value":4
33                    },
34                    "simple_value#Cumulative_Seller_num_agg":{
35                        "value":9
36                    }
37                }
38            }
39        }
40    }


image.png

与 avg类似。

image.png

与 avg类似。

image.png

与 avg类似。

image.png

与 avg类似。

相关文章
|
8月前
|
流计算
Flink 多个stream合并聚合
Flink 多个stream合并聚合
126 0
Flink 多个stream合并聚合
|
7月前
|
存储 缓存 自然语言处理
elasticsearch 聚合 : 指标聚合、桶聚合、管道聚合解析使用总结
elasticsearch 聚合 : 指标聚合、桶聚合、管道聚合解析使用总结
|
数据建模
白话Elasticsearch59-数据建模实战_ Nested Aggregation/ Reverse nested Aggregation对嵌套的博客评论数据进行聚合分析
白话Elasticsearch59-数据建模实战_ Nested Aggregation/ Reverse nested Aggregation对嵌套的博客评论数据进行聚合分析
99 0
|
缓存 自然语言处理 数据挖掘
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
116 0
|
存储
ES聚合查询详解(四):管道聚合
ES聚合查询详解(四):管道聚合
568 0
ES聚合查询详解(四):管道聚合
|
SQL NoSQL JavaScript
mongo 进阶之——聚合管道
上面这句话的意思是,先用pumber来进行分组,会有两个字段,一个是"_id"和"count",在后一个管道中用1表示显示,0表示不显示
mongo 进阶之——聚合管道
|
NoSQL MongoDB
MongoDB 聚合管道(Aggregation Pipeline)
MongoDB 聚合管道(Aggregation Pipeline)
276 0
MongoDB 聚合管道(Aggregation Pipeline)
|
存储 缓存 NoSQL
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
ES通过 Enrich Processor 的 Ingest Pipeline 实现关系数据库中的表关联(join)操作
|
大数据 开发者
聚合操作_多维聚合_rollup | 学习笔记
快速学习聚合操作_多维聚合_rollup
聚合操作_多维聚合_rollup | 学习笔记
|
分布式计算 大数据 Spark
聚合操作_多维聚合_rollup 案例 | 学习笔记
快速学习聚合操作_多维聚合_rollup 案例
聚合操作_多维聚合_rollup 案例 | 学习笔记