《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.13.Transform (1) https://developer.aliyun.com/article/1228178
前期准备
在开始使用 Transforms 功能前我们必须要确保一下3件事:
1、必须要个适当的许可证(License),目前 Transforms 属于免费功能,只要有 Basic License 即可;
2、在 Elasticsearch 集群中必须至少有1个transform角色的节点,节点角色可以通过elasticsearch.yml中的node.roles变量配置;
3、如果集群使用了安全控制模块,那么需要操作用户具有内置角色以及特定权限;
l 如果想查看 Transforms 任务的配置或状态以下条件需至少满足1条:
○ 用户具体内置角色 —— transform_user
○ 用户某个自定义角色具有权限 —— monitor_transform
l 如果想管理 Transforms 任务则必须满足以下所有条件:
○ 用户具有transform_admin内置角色或其角色具有manage_transform权限
○ 源索引的read和view_index_metadata权限
○ 目标索引的read、create_index和index权限
任务初始化及开始
在做完上述准备工作后我们就可以创建 Transforms 任务了。示例如下:
PUT _transform/<transform_id>?defer_validation #1 { "source": { # 2 "index": "kibana_sample_data_ecommerce", "query": { "term": { "geoip.continent_name": { "value": "Asia" } } } }, "pivot": { # 3 "group_by": { # 3.1 "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { #3.2 "max_price": { "max": { "field": "taxful_total_price" } } } }, "description": "Maximum priced ecommerce data by customer_id in Asia", # 4 "dest": { #5 "index": "kibana_sample_data_ecommerce_transform", "pipeline": "add_timestamp_pipeline" }, "frequency": "5m", #6 "settings": { #7 "docs_per_second": 100, "max_page_search_size": 500 } "sync": { #8 "time": { "field": "order_date", "delay": "60s" } } }
如#1所示,我们通过发送一个 PUT 请求来创建 Transforms 任务,发送的Path是 _transform/ 其中 为用户自定义的任务 ID,请求还可以带一个查询参数—— defer_validation,如果该参数存在,则创建任务时执行参数校验。一般源索引在 Transforms 任务创建后才创建的情况需要使用此参数。
创建任务的请求体主要包括以下6个部分:
1、source(#2):必要的对象,用于配置执行计算的数据源,该部分由index索引和query查询组成。
l 索引可以是单值也可以是数组,同时也支持通配符。
l 查询是可选对象,当每次计算仅需要索引部分数据时,则可在此处配置1个 DSL 查询语句
2、pivot(#3):必要的对象,用于配置 Transform 任务的具体计算逻辑,该分部主要由group_by分桶逻辑(#3.1)和aggregations聚合逻辑(#3.2)组成。本例子表示按用户 ID 分组计算每个用户总消费量(税前)。
l group_by分桶逻辑表示 Transforms 任务聚合数据如何分桶,配置复用分桶聚合(Bucket aggregations)API 中的相关配置,具体有以下4种:
○ Term分桶,最常用策略,根据属性所有枚举值分桶。
○ 区间分桶(histogram),对于数值类属性,按照指定跨度分桶。
○ 日期区间分桶(date_histogram),对于时间类属性,提供按年、月、日、周、时、分、秒等跨度分桶。
○ 地理位置分桶(Geotile Grid),对于地理位置类(geo_point)属性,按照指定网格分桶
l aggregations聚合逻辑用于配置对于落入各个分桶中的数据如何计算的汇总数据,该配置符合 指标聚合(Metrics aggregations)API中的相关配置,具体支持以下16种:
○ 均值聚合(Average),对于数值型属性求其平均值。
○ 桶脚本聚合(bucket script),该种聚合可以将其他聚合值作为参数放在脚本中计算出1个新值。
○ 桶筛选聚合(bucket selector),该种聚合可以通过脚本设置1个筛选器作用于其他聚合使得结果更符合业务需求。
○ 基数聚合(Cardinality),该聚合用于统计去重后的数量,该结果为近似值。
○ 过滤聚合(filter),用于给聚合增加1个过滤条件。
○ 地理边界聚合(Geo bounds),该聚合结果是覆盖桶内所有数据的方形地理范围。
○ 地理质心聚合(Geo centroid),该聚合的作用是对桶内所有坐标值计算加权质心(Centroid)。
○ 最大值(Max),对于数值型属性求桶内最大值。
○ 最小值(Min),对于数值型属性求桶内最小值。
○ 百分位聚合(Percentiles),计算从聚合文档中提取的数值的一个或多个百分位数。如计算响应时长的5分位、7分位、9分位值。
○ 稀有Terms聚合(Rare Terms),用于统计长尾分布或不常出现的Term。
○ 脚本指标聚合(Scripted metric),使用脚本计算聚合指标。
○ 求和(Sum),对于桶内某数值型属性求和。
○ Terms聚合,计算分桶内某属性每个Term涉及的文档数。
○ 值计数聚合(Value count),计算从文档中属性或脚本计算的值的数量。功能类似SQL的 count关键字。
○ 加权平均聚合(Weighted average),对于数值属性,加权后再计算平局值。
3、description(#4),可选,字符,用于保存任务的说明。
4、dest(#5),必要,对象,用于配置目标索引,index字段配目标索引名,如果索引不存在则会自动创建,输出时也可以加1个管道,用pipeline配置管道 ID。
5、frequency(#6),可选,时间单位, Transforms 连续运行时检查源索引变化的间隔。该时间同时表示查询或写入出错时重试的时间间隔,最小值为1s,最大值为1h。默认值为1m。
6、settings(#7),可选,对象,用于配置 Transforms 任务特性,适当的特性能减轻集群压力,目前可设置的特性有2个:
l docs_per_second:指定每秒写入新索引文档数的限制,此设置通过在搜索请求之间添加等待时间来限制,默认为null不限制。
l max_page_search_size:定义每次任务执行时的复合聚合的初始分页大小。如果发生器异常
l ,分页大小会动态调整为较低的值。最小值为10,最大值为10,000。默认值为500。
7、sync(#7),可选,对象,该对象用于配置 Transforms 持续同步的策略,策略使用checkpoint机制。如果不配则 Transforms 任务为一次性任务。目前ES只支持基于时间属性配置测录,所以该配置必须配1个名为time的对象,该对象有2个属性。
l field,必填字段,需配置1个日期类型属性,该属性用于识别源索引中的新文档,推荐使用es内置的时间戳字段 _ingest.timestamp。
l delay,可选,用于配置1个时间间隔,用户需要保证在该时间间隔内文档能写入完成并可被搜索默认60秒。
创建完任务后,我们还需要执行开始任务的命令,让任务实际运行。当开始 Transforms 任务时,如果目标索引不存在,则会自动创建,创建时mapping会根据任务配置自行推断生成,并将number_of_shards被设置为1,并将 auto_expand_replicas设置为0-1。如果索引匹配某映射模板(mapping templates)则会使用模板的配置。正确的做法是目的索引应根据任务提前创建好索引。如果您在创建转换时推迟验证,转换开始时,会进行一系列验证以确保其成功。开始任务的API如下,其中 为任务 ID。
POST _transform/<transform_id>/_start
《Elastic Stack 实战手册》——三、产品能力——3.5 进阶篇——3.5.13.Transform (3) https://developer.aliyun.com/article/1228174