FlinkSQL窗口新特性(Window TVF)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 理解Window TVFWindow TVF使用

理解Window TVF


表值函数(table-valued function, TVF),顾名思义就是指返回值是⼀张表的函数,是不是很神奇,在Oracle、SQL Server等数据库中屡⻅不鲜。⽽在Flink的上⼀个稳定版本1.13中,社区通过FLIP-145提出了窗⼝表值函数(window TVF)的实现,⽤于替代旧版的窗⼝分组(group window)语法



注意:

  • Window TVF⼀个重要的特性是窗⼝定义函数返回的⼀张表,且包含了window_start, window_end,window_time三个字段分别代表窗⼝开始时间戳、结束时间戳、窗⼝时间戳;

  • window_time是窗⼝ 往下发送数据的时间属性,等于window_end - 1毫秒。

  • 特别注意:window_start 和 window_end 列是常规时间戳列,⽽不是时间属性。 因此它们不能⽤作后续基于时间 的操作中的时间属性。

  • 为了往下游操作传播时间属性,您需要在 GROUP BY ⼦句中额外添加 window_time 列。 window_time 是 Windowing TVF ⽣成的,它是窗⼝的时间属性。 然后后⾯的查询可以使⽤该列进⾏后续的基于时 间的操作(不需要再设置时间属性列;额),例如级联窗⼝聚合和窗⼝TopN。



我们可以通过如下链接来理解⼀下:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%253A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-TumblingWindows



实例



准备⼀个数据⽂件~/test/bid/bids⽂件,⾥⾯存储的是供应商报价信息



2020-04-1508:05,4.00,C,supplier12020-04-1508:07,2.00,A,supplier12020-04-1508:09,5.00,D,supplier22020-04-1508:11,3.00,B,supplier22020-04-1508:13,1.00,E,supplier12020-04-1508:17,6.00,F,supplier2


CREATETABLEbid (
bidtimeTIMESTAMP(3),
priceDECIMAL(10, 2),
itemSTRING,
supplier_idSTRING,
WATERMARKFORbidtimeASbidtime-INTERVAL'1'SECOND) WITH (
'connector'='filesystem',
'path'='/home/hadoop/test/bid',
'format'='csv');
descbid;
SELECT*FROMbid;

滚动窗口

image.png




SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end,window_time, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMBid;
--------------------------|bidtime|price|item|--------------------------|8:07|$2|A||8:11|$3|B||8:05|$4|C||8:09|$5|D||8:13|$1|E||8:17|$6|F|-------------------------->SELECT*FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
TUMBLE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
-------------------------------------|window_start|window_end|price|-------------------------------------|8:00|8:10|$11||8:10|8:20|$10|-------------------------------------


滑动窗口

image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;



>SELECT*FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
HOP(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SLIDE=>INTERVAL'5'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:07|$2|A|8:05|8:15||8:11|$3|B|8:05|8:15||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:05|$4|C|8:05|8:15||8:09|$5|D|8:00|8:10||8:09|$5|D|8:05|8:15||8:13|$1|E|8:05|8:15||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20||8:17|$6|F|8:15|8:25|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:10|$11||8:05|8:15|$15||8:10|8:20|$10||8:15|8:25|$6|--------------------------------------



累计窗口

Window TVF新增了窗⼝:CUMULATE WINDOW,即累积窗⼝,⽤于计算⼀些累积值,例如:当⽇累积UV。


image.png



SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYsupplier_id,window_start, window_end;
--还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE(
CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,window_time;


>SELECT*FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES));
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE(
CUMULATE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
STEP=>INTERVAL'2'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:08||8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:12||8:11|$3|B|8:10|8:14||8:11|$3|B|8:10|8:16||8:11|$3|B|8:10|8:18||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:06||8:05|$4|C|8:00|8:08||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:14||8:13|$1|E|8:10|8:16||8:13|$1|E|8:10|8:18||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:18||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price)
FROMTABLE(
CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
--------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:06|$4||8:00|8:08|$6||8:00|8:10|$11||8:10|8:12|$3||8:10|8:14|$4||8:10|8:16|$4||8:10|8:18|$10||8:10|8:20|$10|--------------------------------------


会话窗口


image.png



>SELECT*FROMBid;
------------------------------------|bidtime|price|item|bidder|------------------------------------|8:07|$2|A|takidau||8:05|$1|A|klk||8:09|$10|B|takidau||8:08|$3|A|klk||8:17|$20|B|klk|------------------------------------>SELECT*FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES);
--orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMSESSION(
data=>TABLEBidsPARTITIONBYbidder,
timecol=>DESCRIPTOR(bidtime),
gap=>INTERVAL'5'MINUTES);
----------------------------------------------------------------|bidtime|price|item|bidder|window_start|window_end|----------------------------------------------------------------|8:07|$2|A|takidau|8:07|8:14||8:05|$1|A|klk|8:05|8:13||8:09|$10|B|takidau|8:07|8:14||8:08|$3|A|klk|8:05|8:13||8:17|$20|B|klk|8:17|8:22|---------------------------------------------------------------->SELECTbidder, window_start, window_end, SUM(price)
FROMTABLE(
SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES)
GROUPBYbidder, window_start, window_end;
-----------------------------------------------|bidder|window_start|window_end|price|-----------------------------------------------|takidau|8:07|8:14|$12||klk|8:05|8:13|$4||klk|8:17|8:22|$20|-----------------------------------------------


窗口应用场景总计

窗口

应用场景

滚动窗口

BI分析(每⼩时的GMV、成交数量、商品成交量排⾏,凡是求每⼀⼩时/每天的某个指标的语意)

滑动窗口

监控、监测分析(最近1⼩时的cpu使⽤率的最⼤值、最⼩值。。。,凡是最近1⼩时/1分钟这种 语意的均可)

会话窗口

⽤户⾏为分析(分析每个⽤户在每个会话期间的点击次数、收藏次数)

累计窗口

固定的开始结束时间,每隔多⻓时间计算⼀次指标(每⼗分钟计算当天的累积UV)


Window TVF多维分析




前⾯的例⼦⾥⾯都是按照所有字段分组,加⼊我们想根据多个分组集合来分组应该怎么办呢?需要写多个SQL单独 执⾏吗?


Window TVF语法的标准化带来了更多的灵活性和扩展性,⽤户可以直接在 window 窗⼝函数上进⾏多维分析。可 以直接进⾏ GROUPING SETS、ROLLUP、CUBE 的分析计算。如果是在 1.13 之前的版本,我们可能需要对这些分 组进⾏单独的 SQL 聚合,再对聚合结果做 union 操作才能达到类似的效果。⽽现在,类似这种多维分析的场景, 可以直接在 window TVF 上支持


需求:


现在想每⼗分钟分别按照supplier_id、item分组求价格平均值,同时要考虑不按照supplier_id、item分组求价格 平均值,还要考虑同时按照supplier_id、item分组求价格平均值:



传统做法:多sql


SELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, supplier_id;
SELECTwindow_start, window_end, item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, item;
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end;
SELECTwindow_start, window_end, supplier_id,item,SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end,supplier_id,item;


使用 window TVF 实现


SELECTwindow_start, window_end, window_time,supplier_id,item, SUM(price) aspriceFROMTABLE(
TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES))
GROUPBYwindow_start, window_end, window_time,GROUPINGSETS ((supplier_id,item),
(supplier_id),(item),());

注意:window_start, window_end, window_time必须出现在group by 子句中。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 存储 API
Flink教程(20)- Flink高级特性(双流Join)
Flink教程(20)- Flink高级特性(双流Join)
1031 0
|
存储 消息中间件 SQL
Flink 必知必会经典课程8:Flink Connector 详解
关于Flink Connector的详解,本文将通过四部分展开介绍:1. 连接器;2. Source API;3. Sink API;4. Collector的未来发展。
Flink 必知必会经典课程8:Flink Connector 详解
|
流计算 Java 监控
如何分析及处理 Flink 反压?
反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。
如何分析及处理 Flink 反压?
|
SQL 双11 流计算
Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析
通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。
|
9月前
|
Cloud Native Apache 流计算
资料合集|Flink Forward Asia 2024 上海站
Apache Flink 年度技术盛会聚焦“回顾过去,展望未来”,涵盖流式湖仓、流批一体、Data+AI 等八大核心议题,近百家厂商参与,深入探讨前沿技术发展。小松鼠为大家整理了 FFA 2024 演讲 PPT ,可在线阅读和下载。
8395 18
资料合集|Flink Forward Asia 2024 上海站
|
8月前
|
存储 SQL 数据挖掘
深入理解 Flink 中的 State
Flink 的 State(状态)是其四大核心之一,为流处理和批处理任务提供强大支持。本文深入探讨 Flink 中的状态管理,涵盖 State 在 HDFS 中的存储格式、存在形式(如 ValueState、ListState 等)、使用方法、过期时间 TTL 和清除策略,并介绍 Table API 和 SQL 模块中的状态管理。通过实际案例,帮助读者理解如何在电商订单处理、实时日志统计等场景中有效利用状态管理功能。
730 16
|
SQL Oracle 关系型数据库
Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
【2月更文挑战第17天】Flink的表值函数(Table-Valued Function,TVF)是一种返回值是一张表的函数
275 1
|
SQL 流计算
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
Flink SQL 在快手实践问题之使用Dynamic Cumulate Window绘制直播间累计UV曲线如何解决
216 1
|
Oracle NoSQL 关系型数据库
实时计算 Flink版产品使用问题之在BEGINSTATEMENTSET;END;语句中,如何同时写三个CDAS语句
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
数据库
数仓建设:数据域和主题域是什么关系?
数仓建设:数据域和主题域是什么关系?
9762 2
数仓建设:数据域和主题域是什么关系?