FlinkSQL窗口新特性(Window TVF)

简介: 理解Window TVFWindow TVF使用

理解Window TVF


表值函数(table-valued function, TVF),顾名思义就是指返回值是⼀张表的函数,是不是很神奇,在Oracle、SQL Server等数据库中屡⻅不鲜。⽽在Flink的上⼀个稳定版本1.13中,社区通过FLIP-145提出了窗⼝表值函数(window TVF)的实现,⽤于替代旧版的窗⼝分组(group window)语法



注意:

  • Window TVF⼀个重要的特性是窗⼝定义函数返回的⼀张表,且包含了window_start, window_end,window_time三个字段分别代表窗⼝开始时间戳、结束时间戳、窗⼝时间戳;

  • window_time是窗⼝ 往下发送数据的时间属性,等于window_end - 1毫秒。

  • 特别注意:window_start 和 window_end 列是常规时间戳列,⽽不是时间属性。 因此它们不能⽤作后续基于时间 的操作中的时间属性。

  • 为了往下游操作传播时间属性,您需要在 GROUP BY ⼦句中额外添加 window_time 列。 window_time 是 Windowing TVF ⽣成的,它是窗⼝的时间属性。 然后后⾯的查询可以使⽤该列进⾏后续的基于时 间的操作(不需要再设置时间属性列;额),例如级联窗⼝聚合和窗⼝TopN。



我们可以通过如下链接来理解⼀下:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%253A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-TumblingWindows



实例



准备⼀个数据⽂件~/test/bid/bids⽂件,⾥⾯存储的是供应商报价信息



2020-04-1508:05,4.00,C,supplier12020-04-1508:07,2.00,A,supplier12020-04-1508:09,5.00,D,supplier22020-04-1508:11,3.00,B,supplier22020-04-1508:13,1.00,E,supplier12020-04-1508:17,6.00,F,supplier2


CREATETABLEbid (
bidtimeTIMESTAMP(3),
priceDECIMAL(10, 2),
itemSTRING,
supplier_idSTRING,
WATERMARKFORbidtimeASbidtime-INTERVAL'1'SECOND) WITH (
'connector'='filesystem',
'path'='/home/hadoop/test/bid',
'format'='csv');
descbid;
SELECT*FROMbid;

滚动窗口

image.png




SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end,window_time, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMBid;
--------------------------|bidtime|price|item|--------------------------|8:07|$2|A||8:11|$3|B||8:05|$4|C||8:09|$5|D||8:13|$1|E||8:17|$6|F|-------------------------->SELECT*FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
TUMBLE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
-------------------------------------|window_start|window_end|price|-------------------------------------|8:00|8:10|$11||8:10|8:20|$10|-------------------------------------


滑动窗口

image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;



>SELECT*FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
HOP(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SLIDE=>INTERVAL'5'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:07|$2|A|8:05|8:15||8:11|$3|B|8:05|8:15||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:05|$4|C|8:05|8:15||8:09|$5|D|8:00|8:10||8:09|$5|D|8:05|8:15||8:13|$1|E|8:05|8:15||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20||8:17|$6|F|8:15|8:25|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:10|$11||8:05|8:15|$15||8:10|8:20|$10||8:15|8:25|$6|--------------------------------------



累计窗口

Window TVF新增了窗⼝:CUMULATE WINDOW,即累积窗⼝,⽤于计算⼀些累积值,例如:当⽇累积UV。


image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
CUMULATE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
STEP=>INTERVAL'2'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:08||8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:12||8:11|$3|B|8:10|8:14||8:11|$3|B|8:10|8:16||8:11|$3|B|8:10|8:18||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:06||8:05|$4|C|8:00|8:08||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:14||8:13|$1|E|8:10|8:16||8:13|$1|E|8:10|8:18||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:18||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:06|$4||8:00|8:08|$6||8:00|8:10|$11||8:10|8:12|$3||8:10|8:14|$4||8:10|8:16|$4||8:10|8:18|$10||8:10|8:20|$10|--------------------------------------


会话窗口


image.png



>SELECT*FROMBid;
------------------------------------|bidtime|price|item|bidder|------------------------------------|8:07|$2|A|takidau||8:05|$1|A|klk||8:09|$10|B|takidau||8:08|$3|A|klk||8:17|$20|B|klk|------------------------------------>SELECT*FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES);
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMSESSION(
data=>TABLEBidsPARTITIONBYbidder,
timecol=>DESCRIPTOR(bidtime),
gap=>INTERVAL'5'MINUTES);
----------------------------------------------------------------|bidtime|price|item|bidder|window_start|window_end|----------------------------------------------------------------|8:07|$2|A|takidau|8:07|8:14||8:05|$1|A|klk|8:05|8:13||8:09|$10|B|takidau|8:07|8:14||8:08|$3|A|klk|8:05|8:13||8:17|$20|B|klk|8:17|8:22|---------------------------------------------------------------->SELECTbidder, window_start, window_end, SUM(price)
FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES)
GROUPBYbidder, window_start, window_end;
-----------------------------------------------|bidder|window_start|window_end|price|-----------------------------------------------|takidau|8:07|8:14|$12||klk|8:05|8:13|$4||klk|8:17|8:22|$20|-----------------------------------------------


窗口应用场景总计

窗口

应用场景

滚动窗口

BI分析(每⼩时的GMV、成交数量、商品成交量排⾏,凡是求每⼀⼩时/每天的某个指标的语意)

滑动窗口

监控、监测分析(最近1⼩时的cpu使⽤率的最⼤值、最⼩值。。。,凡是最近1⼩时/1分钟这种 语意的均可)

会话窗口

⽤户⾏为分析(分析每个⽤户在每个会话期间的点击次数、收藏次数)

累计窗口

固定的开始结束时间,每隔多⻓时间计算⼀次指标(每⼗分钟计算当天的累积UV)


Window TVF多维分析




前⾯的例⼦⾥⾯都是按照所有字段分组,加⼊我们想根据多个分组集合来分组应该怎么办呢?需要写多个SQL单独 执⾏吗?


Window TVF语法的标准化带来了更多的灵活性和扩展性,⽤户可以直接在 window 窗⼝函数上进⾏多维分析。可 以直接进⾏ GROUPING SETS、ROLLUP、CUBE 的分析计算。如果是在 1.13 之前的版本,我们可能需要对这些分 组进⾏单独的 SQL 聚合,再对聚合结果做 union 操作才能达到类似的效果。⽽现在,类似这种多维分析的场景, 可以直接在 window TVF 上支持


需求:


现在想每⼗分钟分别按照supplier_id、item分组求价格平均值,同时要考虑不按照supplier_id、item分组求价格 平均值,还要考虑同时按照supplier_id、item分组求价格平均值:



传统做法:多sql


SELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, supplier_id;
SELECTwindow_start, window_end, item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, item;
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
SELECTwindow_start, window_end, supplier_id,item,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,supplier_id,item;


使用 window TVF 实现


SELECTwindow_start, window_end, window_time,supplier_id,item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, window_time,GROUPINGSETS ((supplier_id,item),
(supplier_id),(item),());

注意:window_start, window_end, window_time必须出现在group by 子句中。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 分布式计算 Apache
构建 Streaming Lakehouse:使用 Paimon 和 Hudi 的性能对比
Apache Paimon 和 Apache Hudi 作为数据湖存储格式,有着高吞吐的写入和低延迟的查询性能,是构建数据湖的常用组件。本文将在阿里云EMR 上,针对数据实时入湖场景,对 Paimon 和 Hudi 的性能进行比对,然后分别以 Paimon 和 Hudi 作为统一存储搭建准实时数仓。
60917 9
构建 Streaming Lakehouse:使用 Paimon 和 Hudi 的性能对比
|
流计算
Flink自定义source、自定义sink
Flink自定义source、自定义sink
545 0
|
SQL 双11 流计算
Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析
通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。
|
存储 SQL 缓存
Flink 2.0 存算分离状态存储 — ForSt DB 
本文整理自阿里云技术专家兰兆千在Flink Forward Asia 2024上的分享,主要介绍Flink 2.0的存算分离架构、全新状态存储内核ForSt DB及工作进展与未来展望。Flink 2.0通过存算分离解决了本地磁盘瓶颈、检查点资源尖峰和作业恢复速度慢等问题,提升了云原生部署能力。ForSt DB作为嵌入式Key-value存储内核,支持远端读写、批量并发优化和快速检查点等功能。性能测试表明,ForSt在异步访问和本地缓存支持下表现卓越。未来,Flink将继续完善SQL Operator的异步优化,并引入更多流特性支持。
1533 88
Flink 2.0 存算分离状态存储 — ForSt DB 
|
SQL 分布式计算 数据处理
FlinkSQL开发经验分享
FlinkSQL开发经验分享
614 8
|
SQL 分布式计算 流计算
官宣|Apache Paimon 1.0 发布公告
官宣|Apache Paimon 1.0 发布公告
906 8
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
2456 28
|
存储 SQL 数据挖掘
深入理解 Flink 中的 State
Flink 的 State(状态)是其四大核心之一,为流处理和批处理任务提供强大支持。本文深入探讨 Flink 中的状态管理,涵盖 State 在 HDFS 中的存储格式、存在形式(如 ValueState、ListState 等)、使用方法、过期时间 TTL 和清除策略,并介绍 Table API 和 SQL 模块中的状态管理。通过实际案例,帮助读者理解如何在电商订单处理、实时日志统计等场景中有效利用状态管理功能。
1402 16
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
1283 14
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1848 27