FlinkSQL窗口新特性(Window TVF)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 理解Window TVFWindow TVF使用

理解Window TVF


表值函数(table-valued function, TVF),顾名思义就是指返回值是⼀张表的函数,是不是很神奇,在Oracle、SQL Server等数据库中屡⻅不鲜。⽽在Flink的上⼀个稳定版本1.13中,社区通过FLIP-145提出了窗⼝表值函数(window TVF)的实现,⽤于替代旧版的窗⼝分组(group window)语法



注意:

  • Window TVF⼀个重要的特性是窗⼝定义函数返回的⼀张表,且包含了window_start, window_end,window_time三个字段分别代表窗⼝开始时间戳、结束时间戳、窗⼝时间戳;

  • window_time是窗⼝ 往下发送数据的时间属性,等于window_end - 1毫秒。

  • 特别注意:window_start 和 window_end 列是常规时间戳列,⽽不是时间属性。 因此它们不能⽤作后续基于时间 的操作中的时间属性。

  • 为了往下游操作传播时间属性,您需要在 GROUP BY ⼦句中额外添加 window_time 列。 window_time 是 Windowing TVF ⽣成的,它是窗⼝的时间属性。 然后后⾯的查询可以使⽤该列进⾏后续的基于时 间的操作(不需要再设置时间属性列;额),例如级联窗⼝聚合和窗⼝TopN。



我们可以通过如下链接来理解⼀下:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%253A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-TumblingWindows



实例



准备⼀个数据⽂件~/test/bid/bids⽂件,⾥⾯存储的是供应商报价信息



2020-04-1508:05,4.00,C,supplier12020-04-1508:07,2.00,A,supplier12020-04-1508:09,5.00,D,supplier22020-04-1508:11,3.00,B,supplier22020-04-1508:13,1.00,E,supplier12020-04-1508:17,6.00,F,supplier2


CREATETABLEbid (
bidtimeTIMESTAMP(3),
priceDECIMAL(10, 2),
itemSTRING,
supplier_idSTRING,
WATERMARKFORbidtimeASbidtime-INTERVAL'1'SECOND) WITH (
'connector'='filesystem',
'path'='/home/hadoop/test/bid',
'format'='csv');
descbid;
SELECT*FROMbid;

滚动窗口

image.png




SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end,window_time, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMBid;
--------------------------|bidtime|price|item|--------------------------|8:07|$2|A||8:11|$3|B||8:05|$4|C||8:09|$5|D||8:13|$1|E||8:17|$6|F|-------------------------->SELECT*FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
TUMBLE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
-------------------------------------|window_start|window_end|price|-------------------------------------|8:00|8:10|$11||8:10|8:20|$10|-------------------------------------


滑动窗口

image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;



>SELECT*FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
HOP(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SLIDE=>INTERVAL'5'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:07|$2|A|8:05|8:15||8:11|$3|B|8:05|8:15||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:05|$4|C|8:05|8:15||8:09|$5|D|8:00|8:10||8:09|$5|D|8:05|8:15||8:13|$1|E|8:05|8:15||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20||8:17|$6|F|8:15|8:25|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:10|$11||8:05|8:15|$15||8:10|8:20|$10||8:15|8:25|$6|--------------------------------------



累计窗口

Window TVF新增了窗⼝:CUMULATE WINDOW,即累积窗⼝,⽤于计算⼀些累积值,例如:当⽇累积UV。


image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
CUMULATE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
STEP=>INTERVAL'2'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:08||8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:12||8:11|$3|B|8:10|8:14||8:11|$3|B|8:10|8:16||8:11|$3|B|8:10|8:18||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:06||8:05|$4|C|8:00|8:08||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:14||8:13|$1|E|8:10|8:16||8:13|$1|E|8:10|8:18||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:18||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:06|$4||8:00|8:08|$6||8:00|8:10|$11||8:10|8:12|$3||8:10|8:14|$4||8:10|8:16|$4||8:10|8:18|$10||8:10|8:20|$10|--------------------------------------


会话窗口


image.png



>SELECT*FROMBid;
------------------------------------|bidtime|price|item|bidder|------------------------------------|8:07|$2|A|takidau||8:05|$1|A|klk||8:09|$10|B|takidau||8:08|$3|A|klk||8:17|$20|B|klk|------------------------------------>SELECT*FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES);
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMSESSION(
data=>TABLEBidsPARTITIONBYbidder,
timecol=>DESCRIPTOR(bidtime),
gap=>INTERVAL'5'MINUTES);
----------------------------------------------------------------|bidtime|price|item|bidder|window_start|window_end|----------------------------------------------------------------|8:07|$2|A|takidau|8:07|8:14||8:05|$1|A|klk|8:05|8:13||8:09|$10|B|takidau|8:07|8:14||8:08|$3|A|klk|8:05|8:13||8:17|$20|B|klk|8:17|8:22|---------------------------------------------------------------->SELECTbidder, window_start, window_end, SUM(price)
FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES)
GROUPBYbidder, window_start, window_end;
-----------------------------------------------|bidder|window_start|window_end|price|-----------------------------------------------|takidau|8:07|8:14|$12||klk|8:05|8:13|$4||klk|8:17|8:22|$20|-----------------------------------------------


窗口应用场景总计

窗口

应用场景

滚动窗口

BI分析(每⼩时的GMV、成交数量、商品成交量排⾏,凡是求每⼀⼩时/每天的某个指标的语意)

滑动窗口

监控、监测分析(最近1⼩时的cpu使⽤率的最⼤值、最⼩值。。。,凡是最近1⼩时/1分钟这种 语意的均可)

会话窗口

⽤户⾏为分析(分析每个⽤户在每个会话期间的点击次数、收藏次数)

累计窗口

固定的开始结束时间,每隔多⻓时间计算⼀次指标(每⼗分钟计算当天的累积UV)


Window TVF多维分析




前⾯的例⼦⾥⾯都是按照所有字段分组,加⼊我们想根据多个分组集合来分组应该怎么办呢?需要写多个SQL单独 执⾏吗?


Window TVF语法的标准化带来了更多的灵活性和扩展性,⽤户可以直接在 window 窗⼝函数上进⾏多维分析。可 以直接进⾏ GROUPING SETS、ROLLUP、CUBE 的分析计算。如果是在 1.13 之前的版本,我们可能需要对这些分 组进⾏单独的 SQL 聚合,再对聚合结果做 union 操作才能达到类似的效果。⽽现在,类似这种多维分析的场景, 可以直接在 window TVF 上支持


需求:


现在想每⼗分钟分别按照supplier_id、item分组求价格平均值,同时要考虑不按照supplier_id、item分组求价格 平均值,还要考虑同时按照supplier_id、item分组求价格平均值:



传统做法:多sql


SELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, supplier_id;
SELECTwindow_start, window_end, item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, item;
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
SELECTwindow_start, window_end, supplier_id,item,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,supplier_id,item;


使用 window TVF 实现


SELECTwindow_start, window_end, window_time,supplier_id,item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, window_time,GROUPINGSETS ((supplier_id,item),
(supplier_id),(item),());

注意:window_start, window_end, window_time必须出现在group by 子句中。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
流计算
Flink窗口——window
Flink窗口——window
43 0
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
35 1
|
7月前
|
SQL 自然语言处理 机器人
Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
【1月更文挑战第3天】【1月更文挑战第12篇】Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
97 1
|
SQL 流计算
如果使用flinksql进行窗口关联操作,然后结合使用自定义表聚合函数,怎么对多表的window_start和window_end进行聚合操作?
如果使用flinksql进行窗口关联操作,然后结合使用自定义表聚合函数,怎么对多表的window_start和window_end进行聚合操作?
114 2
|
API 流计算 Windows
Flink之窗口 (Window) 下篇2
Flink之窗口 (Window) 下篇
138 0
|
缓存 API 流计算
Flink之窗口 (Window) 下篇1
Flink之窗口 (Window) 下篇
157 0
|
存储 程序员 BI
Flink之窗口 (Window) 上篇
Flink之窗口 (Window) 上篇
372 0
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
694 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
流计算
Flink Window 排序
## 概述 - 对增量Window进行输出排序 - WordCount增量(按单词名称排序) - WordCount增量(按单词个数,再单词名称排序)
5332 0
|
SQL 监控 BI
FlinkSQL窗口操作
group windows over windows 滑动窗口 滚动窗口 会话窗口
FlinkSQL窗口操作