前言
接着上次写剩下的查询继续学习。
Flink SQL 查询
环境准备:
# 1. 先启动 hadoop myhadoop start # 2. 不需要启动 flink 只启动yarn-session即可 /opt/module/flink-1.17.0/bin/yarn-session.sh -d # 3. 启动 flink sql 的环境 sql-client ./sql-client.sh embedded -s yarn-session
记得第二步:启动 yarn-seesion !!!
注意:我们写 SQL 的时候尽量避免关键字,比如函数名(avg、sum)!
1、分组窗口聚合
分组窗口起始就是我们之前学过的 滑动窗口、会话窗口、滚动窗口,之所以叫它分组窗口,其实是把它的一个窗口看做一个分组。
从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合,在这里简单做个介绍。
直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。
Flink SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。
分组窗口函数 |
描述 |
TUMBLE(time_attr, interval) |
定义一个滚动窗口。time_attr 是时间属性,也就是你选择的作为时间语义的字段,interval 是窗口长度。 |
HOP(time_attr, interval, interval) |
定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口的长度( 第二个 interval 参数 )以及一个滑动的步长(第一个 interval 参数 )。 |
SESSION(time_attr, interval) |
定义一个会话时间窗口。interval 是会话的间隔。 |
其中,ROWTIME 代表的是事件时间语义,PROCTIME 是处理时间语义。
1)准备数据
CREATE TABLE ws ( id INT, vc INT, pt AS PROCTIME(), --处理时间 et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间 WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.min' = '1', 'fields.id.max' = '3', 'fields.vc.min' = '1', 'fields.vc.max' = '100' ); # 设置显示 set sql-client.execution.result-mode=tableau;
我们可以看到,一张 flink sql 的表是允许事件时间和处理时间都存在的,只是水位线只能指定一个。
2)滑动窗口示例(时间属性,窗口长度)
Flink SQL> select id, > sum(vc) as vcSum, > tumble_start(et,interval '5' second) as window_start, > tumble_end(et,interval '5' second) as window_end > from ws > group by id,tumble(et,interval '5' second);
查询结果:
可以看到每个 key 的窗口每 5s 滚动一次。
3)滑动窗口(时间属性,滑动步长,窗口长度)
Flink SQL> select id, > hop_start(et,interval '3' second,interval '5' second) window_start, > hop_end(et,interval '3' second,interval '5' second) window_end, > sum(vc) sum_vc > from ws > group by id,hop(et,interval '3' second,interval '5' second);
这里我们指定滑动步长为 3 窗口大小为 5,查看运行结果:
我们可以看到,相同 key 的窗口确实间隔 3s,窗口大小为 5s。
3)会话窗口(时间属性,会话间隔)
select id, SESSION_START(et, INTERVAL '5' SECOND) wstart, SESSION_END(et, INTERVAL '5' SECOND) wend, sum(vc) sumVc from ws group by id, SESSION(et, INTERVAL '5' SECOND);
这里因为我们的数据生成器是源源不断生成数据的,而我们指定了会话间隔为 5s,也就是说只有连续 5s 收不到数据窗口才会关闭,所以我们是看不到数据结果的。
2、窗口表值函数(TVF)聚合
前面我们学习的 Group Window 在 Flink13 版本之后已经被标记为过时了,更推荐的是 TVF Window。
对比GroupWindow,TVF窗口更有效和强大。包括:
- 提供更多的性能优化手段
- 支持GroupingSets语法
- 可以在window聚合中使用TopN
- 提供累积窗口
对于窗口表值函数(TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段 window_start 和 window_end。
FROM TABLE( 窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间间隔) ) GROUP BY [window_start,][window_end,] --可选
1)滚动窗口(tumble)
Flink SQL> select > id, > sum(vc) as vc_sum, > window_start, > window_end > from table( -- 这里的table是关键字 不是表名 > tumble(table ws,descriptor(et),interval '5' second) > ) > group by window_start,window_end,id; -- 这里的window_start,window_end都是关键字
运行结果:
2)滑动窗口(hop)
注意:在 TVF 中,滑动窗口的大小必须是步长的整数倍,因为 TVF 会对滑动窗口进行一个优化:把滑动窗口按照步长大小划分为 (窗口大小/步长)个滚动窗口,这样一些需要重复计算的数据所在的滚动窗口只需要计算一次即可。
如果不是整数倍,会报错:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: HOP table function based aggregate requires size must be an integral multiple of slide, but got size xxx ms and slide xxx ms
Flink SQL> select > id, > sum(vc) as vc_sum, > window_start, > window_end > from table( > hop(table ws,descriptor(et),interval '2' second,interval '6' second) > ) > group by window_start,window_end,id;
我们上面这个滑动窗口会被 TVF 划分为 6/2 也就是 3 个,其中第一个窗口和第二个窗口之间的会有两个重复的计算结果(也就是两个子滚动窗口)。
3)累积窗口(cumulate)
注意:累积窗口的窗口大小必须是步长的整数倍!
Flink SQL> select > id, > sum(vc) vc_sum, > window_start, > window_end > from table( > cumulate(table ws,descriptor(et),interval '2' second,interval '6' second > )) > group by window_start,window_end,id;
累积窗口也很常用,需要指定两个参数(窗口的最大长度,窗口的步长),比如统计一天内每个小时的网站访问量就可以开一个累积窗口(窗口的最大长度是 24h,窗口的步长是 1h),语法也很简单,只需要修改一个滑动窗口的一个关键字:cumulate。当窗口结束后会再开启一个窗口,属性和上一个窗口是一样的。
累积窗口的底层还是一个滚动窗口,当我们定义了一个累积窗口,就相当于开了一个最大的滚动窗口,之后会根据用户指定的步长(也就是触发计算的时间)将这个窗口划分为多个窗口,这些窗口具有相同的起点和不同的终点。
4)grouping sets 多维分析
SELECT window_start, window_end, id , SUM(vc) sumVC FROM TABLE( TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, rollup( (id) ) -- cube( (id) ) -- grouping sets( (id),() ) ;
这个多维分析用到的时候再详细了解,这里只知道Flink有这个功能就行了。
5)会话窗口
Flink 17 的 TVF 函数窗口其实是支持会话窗口的,但是现在还是实验性的功能。
3、Over 聚合
回想我们之前 Hive 学过的开窗函数,对每个数据,我们会去计算它的开窗范围进行计算输出。比如对上一行到这一行的数据进行计算;再比如计算第一行数据到当前行。
OVER 聚合为一系列有序行的每个输入行计算一个聚合值。与 GROUP BY 聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。
可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
1)语法:
SELECT -- 聚合函数 agg_func(agg_col) OVER ( [PARTITION BY col1[, col2, ...]] ORDER BY time_col range_definition), ... FROM ...
- ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
- PARTITION BY:标识了聚合窗口的聚合粒度
- range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合(rows 关键字),第二种为按照时间区间聚合(range 关键字)。
- 按照时间分区时,对于同一时刻的数据都会被聚合计算,但是按照行数聚合就不会。
2)案例
1. 按照时间区间聚合
统计每个传感器前 10 秒到现在收到的水位数据条数
select id, et, vc, count(vc) over(partition by id order by et range between interval '10' second preceding and current row) cnt from ws;
运行结果:
也可以用 WINDOW 子句来在SELECT外部单独定义一个OVER窗口,可以多次使用:
select id, et, vc, count(vc) over w as cnt, -- 这里的 as 可以省略 sum(vc) over w as sum_vc from ws window w as( partition by id order by et range between interval '10' second preceding and current row );
当我们有多个聚合函数并且这几个窗口函数的开窗属性是相同的时候这样写可以简化代码。
2. 按照行数聚合
统计每个传感器前5条到现在数据的平均水位:
select id, et, vc, avg(vc) over w as avg from ws window w as ( partition by id order by et rows between 5 preceding and current row );
运行结果:
Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320