理解Window TVF
表值函数(table-valued function, TVF),顾名思义就是指返回值是⼀张表的函数,是不是很神奇,在Oracle、SQL Server等数据库中屡⻅不鲜。⽽在Flink的上⼀个稳定版本1.13中,社区通过FLIP-145提出了窗⼝表值函数(window TVF)的实现,⽤于替代旧版的窗⼝分组(group window)语法。
注意:
- Window TVF⼀个重要的特性是窗⼝定义函数返回的⼀张表,且包含了window_start, window_end,window_time三个字段分别代表窗⼝开始时间戳、结束时间戳、窗⼝时间戳;
- window_time是窗⼝ 往下发送数据的时间属性,等于window_end - 1毫秒。
- 特别注意:window_start 和 window_end 列是常规时间戳列,⽽不是时间属性。 因此它们不能⽤作后续基于时间 的操作中的时间属性。
- 为了往下游操作传播时间属性,您需要在 GROUP BY ⼦句中额外添加 window_time 列。 window_time 是 Windowing TVF ⽣成的,它是窗⼝的时间属性。 然后后⾯的查询可以使⽤该列进⾏后续的基于时 间的操作(不需要再设置时间属性列;额),例如级联窗⼝聚合和窗⼝TopN。
我们可以通过如下链接来理解⼀下:
实例
准备⼀个数据⽂件~/test/bid/bids⽂件,⾥⾯存储的是供应商报价信息
2020-04-1508:05,4.00,C,supplier12020-04-1508:07,2.00,A,supplier12020-04-1508:09,5.00,D,supplier22020-04-1508:11,3.00,B,supplier22020-04-1508:13,1.00,E,supplier12020-04-1508:17,6.00,F,supplier2
CREATETABLEbid ( bidtimeTIMESTAMP(3), priceDECIMAL(10, 2), itemSTRING, supplier_idSTRING, WATERMARKFORbidtimeASbidtime-INTERVAL'1'SECOND) WITH ( 'connector'='filesystem', 'path'='/home/hadoop/test/bid', 'format'='csv'); descbid; SELECT*FROMbid;
滚动窗口
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; --也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYsupplier_id,window_start, window_end; --还可以获取window_timeSELECTwindow_start, window_end,window_time, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end,window_time;
>SELECT*FROMBid; --------------------------|bidtime|price|item|--------------------------|8:07|$2|A||8:11|$3|B||8:05|$4|C||8:09|$5|D||8:13|$1|E||8:17|$6|F|-------------------------->SELECT*FROMTABLE( TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)); --orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE( TUMBLE( DATA=>TABLEBid, TIMECOL=>DESCRIPTOR(bidtime), SIZE=>INTERVAL'10'MINUTES)); ------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price) FROMTABLE( TUMBLE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; -------------------------------------|window_start|window_end|price|-------------------------------------|8:00|8:10|$11||8:10|8:20|$10|-------------------------------------
滑动窗口
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE( HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; --也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE( HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES)) GROUPBYsupplier_id,window_start, window_end; --还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE( HOP(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end,window_time;
>SELECT*FROMTABLE( HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES)); --orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE( HOP( DATA=>TABLEBid, TIMECOL=>DESCRIPTOR(bidtime), SLIDE=>INTERVAL'5'MINUTES, SIZE=>INTERVAL'10'MINUTES)); ------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:10||8:07|$2|A|8:05|8:15||8:11|$3|B|8:05|8:15||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:10||8:05|$4|C|8:05|8:15||8:09|$5|D|8:00|8:10||8:09|$5|D|8:05|8:15||8:13|$1|E|8:05|8:15||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:20||8:17|$6|F|8:15|8:25|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price) FROMTABLE( HOP(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'5'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; --------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:10|$11||8:05|8:15|$15||8:10|8:20|$10||8:15|8:25|$6|--------------------------------------
累计窗口
Window TVF新增了窗⼝:CUMULATE WINDOW,即累积窗⼝,⽤于计算⼀些累积值,例如:当⽇累积UV。
SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE( CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; --也可以根据其他业务字段groupbySELECTwindow_start, window_end, supplier_id,SUM(price) aspriceFROMTABLE( CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES)) GROUPBYsupplier_id,window_start, window_end; --还可以获取window_timeSELECTwindow_start, window_end, window_time,SUM(price) aspriceFROMTABLE( CUMULATE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end,window_time;
>SELECT*FROMTABLE( CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES)); --orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMTABLE( CUMULATE( DATA=>TABLEBid, TIMECOL=>DESCRIPTOR(bidtime), STEP=>INTERVAL'2'MINUTES, SIZE=>INTERVAL'10'MINUTES)); ------------------------------------------------------|bidtime|price|item|window_start|window_end|------------------------------------------------------|8:07|$2|A|8:00|8:08||8:07|$2|A|8:00|8:10||8:11|$3|B|8:10|8:12||8:11|$3|B|8:10|8:14||8:11|$3|B|8:10|8:16||8:11|$3|B|8:10|8:18||8:11|$3|B|8:10|8:20||8:05|$4|C|8:00|8:06||8:05|$4|C|8:00|8:08||8:05|$4|C|8:00|8:10||8:09|$5|D|8:00|8:10||8:13|$1|E|8:10|8:14||8:13|$1|E|8:10|8:16||8:13|$1|E|8:10|8:18||8:13|$1|E|8:10|8:20||8:17|$6|F|8:10|8:18||8:17|$6|F|8:10|8:20|------------------------------------------------------>SELECTwindow_start, window_end, SUM(price) FROMTABLE( CUMULATE(TABLEBid, DESCRIPTOR(bidtime), INTERVAL'2'MINUTES, INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; --------------------------------------|window_start|window_end|price|--------------------------------------|8:00|8:06|$4||8:00|8:08|$6||8:00|8:10|$11||8:10|8:12|$3||8:10|8:14|$4||8:10|8:16|$4||8:10|8:18|$10||8:10|8:20|$10|--------------------------------------
会话窗口
>SELECT*FROMBid; ------------------------------------|bidtime|price|item|bidder|------------------------------------|8:07|$2|A|takidau||8:05|$1|A|klk||8:09|$10|B|takidau||8:08|$3|A|klk||8:17|$20|B|klk|------------------------------------>SELECT*FROMTABLE( SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES); --orwiththenamedparams--note: theDATAparammustbethefirst>SELECT*FROMSESSION( data=>TABLEBidsPARTITIONBYbidder, timecol=>DESCRIPTOR(bidtime), gap=>INTERVAL'5'MINUTES); ----------------------------------------------------------------|bidtime|price|item|bidder|window_start|window_end|----------------------------------------------------------------|8:07|$2|A|takidau|8:07|8:14||8:05|$1|A|klk|8:05|8:13||8:09|$10|B|takidau|8:07|8:14||8:08|$3|A|klk|8:05|8:13||8:17|$20|B|klk|8:17|8:22|---------------------------------------------------------------->SELECTbidder, window_start, window_end, SUM(price) FROMTABLE( SESSION(TABLEBidPARTITIONBYbidder, DESCRIPTOR(bidtime), DESCRIPTOR(bidder), INTERVAL'5'MINUTES) GROUPBYbidder, window_start, window_end; -----------------------------------------------|bidder|window_start|window_end|price|-----------------------------------------------|takidau|8:07|8:14|$12||klk|8:05|8:13|$4||klk|8:17|8:22|$20|-----------------------------------------------
窗口应用场景总计
窗口 |
应用场景 |
滚动窗口 |
BI分析(每⼩时的GMV、成交数量、商品成交量排⾏,凡是求每⼀⼩时/每天的某个指标的语意) |
滑动窗口 |
监控、监测分析(最近1⼩时的cpu使⽤率的最⼤值、最⼩值。。。,凡是最近1⼩时/1分钟这种 语意的均可) |
会话窗口 |
⽤户⾏为分析(分析每个⽤户在每个会话期间的点击次数、收藏次数) |
累计窗口 |
固定的开始结束时间,每隔多⻓时间计算⼀次指标(每⼗分钟计算当天的累积UV) |
Window TVF多维分析
前⾯的例⼦⾥⾯都是按照所有字段分组,加⼊我们想根据多个分组集合来分组应该怎么办呢?需要写多个SQL单独 执⾏吗?
Window TVF语法的标准化带来了更多的灵活性和扩展性,⽤户可以直接在 window 窗⼝函数上进⾏多维分析。可 以直接进⾏ GROUPING SETS、ROLLUP、CUBE 的分析计算。如果是在 1.13 之前的版本,我们可能需要对这些分 组进⾏单独的 SQL 聚合,再对聚合结果做 union 操作才能达到类似的效果。⽽现在,类似这种多维分析的场景, 可以直接在 window TVF 上支持
需求:
现在想每⼗分钟分别按照supplier_id、item分组求价格平均值,同时要考虑不按照supplier_id、item分组求价格 平均值,还要考虑同时按照supplier_id、item分组求价格平均值:
传统做法:多sql
SELECTwindow_start, window_end, supplier_id, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end, supplier_id; SELECTwindow_start, window_end, item, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end, item; SELECTwindow_start, window_end, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end; SELECTwindow_start, window_end, supplier_id,item,SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end,supplier_id,item;
使用 window TVF 实现
SELECTwindow_start, window_end, window_time,supplier_id,item, SUM(price) aspriceFROMTABLE( TUMBLE(TABLEbid, DESCRIPTOR(bidtime), INTERVAL'10'MINUTES)) GROUPBYwindow_start, window_end, window_time,GROUPINGSETS ((supplier_id,item), (supplier_id),(item),());
注意:window_start, window_end, window_time必须出现在group by 子句中。