需求
每⼩时求卖得最⽕的3件商品。
分析
1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)
2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)
3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可
数据准备
叶修洁,100,3,2022-02-1716:18:20卢智宸,200,1,2022-02-1716:18:22廖天翊,300,2,2022-02-1716:18:24谢煜城,200,5,2022-02-1716:18:29程乐驹,200,1,2022-02-1716:18:33龚擎宇,200,10,2022-02-1716:18:37⽯煜城,300,1,2022-02-1716:18:42⾦楷瑞,300,1,2022-02-1716:18:46⽥烨磊,500,2,2022-02-1716:18:50杜浩宇,400,7,2022-02-1716:18:54
建表
SET'execution.target'='local'; SET'sql-client.execution.result-mode'='tableau'; CREATETABLEorders ( `user`STRING, productIdBIGINT, amountINT, orderTpTIMESTAMP(0), WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND) WITH ( 'connector'='filesystem', 'path'='/home/hadoop/test/orders/orders', 'format'='csv'); descorders; SELECT*FROMorders;
group windows 实现
这⾥直接⽤Flink SQL来实现,Table API的写法类似就不重复了,为了演示效果我们就每10秒计算⼀次Top 2的热⻔ 商品:
1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)
2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)
3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可
CREATETABLEorders ( `user`STRING, productIdBIGINT, amountINT, orderTpTIMESTAMP(0), WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND) --1、SQL实现滚动时间窗⼝,窗⼝中计算商品交易次数CREATEVIEWpcntwindowASselectproductId, count(productId) aspcnt, TUMBLE_END(orderTp, INTERVAL'10'SECOND) ASwindow_endfromordersGROUPBYTUMBLE(orderTp, INTERVAL'10'SECOND), productId; --2、按照窗⼝结束时间分组,并按交易次数排序后排名(注意不是拿窗⼝结束时间当做时间属性的处理哈,就是基于它分个组是没问题的) CREATEVIEWpcntrankASselectproductId, pcnt, window_end, ROW_NUMBER() over (partitionbywindow_endorderbypcntDESC) asrownumfrompcntwindow;--3、求TopNselect*frompcntrankwhererownum<=2;
函数 |
描述 |
结果举例 |
ROW_NUMBER |
为查询出来的分区内每⼀⾏记录⽣成⼀个序号,依次排序且不会重复,注意 使⽤row_number函数时必须要⽤over⼦句选择对某⼀列进⾏排序才能⽣成 序号 |
1,2,3,4,5 |
RANK |
streaming模式下不⽀持 简单来说rank函数就是对查询出来的记录进⾏排名,与row_number函数不 同的是,rank函数考虑到了over⼦句中排序字段值相同的情况,如果使⽤ rank函数来⽣成序号,over⼦句中排序字段值相同的序号是⼀样的,后⾯字 段值不相同的序号将跳过相同的排名号排下⼀个,也就是相关⾏之前的排名 数加⼀ |
1,2,2,4,5 |
DENSE_RANK |
dense_rank函数的功能与rank函数类似,dense_rank函数在⽣成序号时是 连续的,⽽rank函数⽣成的序号有可能不连续。dense_rank函数出现相同 排名时,将不跳过相同排名号,rank值紧接上⼀次的rank值 |
1,2,2,3,4 |
window TVF 实现
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER (PARTITIONBYwindow_start, window_end [, col_key1...] ORDERBYcol1 [asc|desc][, col2 [asc|desc]...]) ASrownumFROMtable_name) --tablename替换为windowTVF即可WHERErownum<=N [ANDconditions]
代码实现
select*from ( selectproductId,pcnt,window_end,ROW_NUMBER() over (partitionbywindow_start, window_endorderbypcntDESC) asrownumfrom( SELECTproductId,window_start,window_end,count(productId) aspcntFROMTABLE(TUMBLE(TABLEorders, DESCRIPTOR(orderTp), INTERVAL'10'SECOND)) GROUPbyproductId,window_start, window_end ) ) WHERErownum<=2;
注意:
Window TopN 要求 PARTITION BY ⼦句包含Window TVF 的开始和结束列;将来,如果Window TVF 是 TUMBLE 或 HOP,我们还可以简化 PARTITION BY ⼦句以仅包含开始和结束列