FlinkSQL Window TOP N 实现

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: group windows 实现window TVF 实现

需求

每⼩时求卖得最⽕的3件商品。


分析

1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)


2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)


3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可



数据准备


叶修洁,100,3,2022-02-1716:18:20卢智宸,200,1,2022-02-1716:18:22廖天翊,300,2,2022-02-1716:18:24谢煜城,200,5,2022-02-1716:18:29程乐驹,200,1,2022-02-1716:18:33龚擎宇,200,10,2022-02-1716:18:37⽯煜城,300,1,2022-02-1716:18:42⾦楷瑞,300,1,2022-02-1716:18:46⽥烨磊,500,2,2022-02-1716:18:50杜浩宇,400,7,2022-02-1716:18:54


建表


SET'execution.target'='local';
SET'sql-client.execution.result-mode'='tableau';
CREATETABLEorders (
`user`STRING,
productIdBIGINT,
amountINT,
orderTpTIMESTAMP(0),
WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND) WITH (
'connector'='filesystem',
'path'='/home/hadoop/test/orders/orders',
'format'='csv');
descorders;
SELECT*FROMorders;


group windows 实现


这⾥直接⽤Flink SQL来实现,Table API的写法类似就不重复了,为了演示效果我们就每10秒计算⼀次Top 2的热⻔ 商品:


1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)


2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)


3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可



CREATETABLEorders (
`user`STRING,
productIdBIGINT,
amountINT,
orderTpTIMESTAMP(0),
WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND)
--1、SQL实现滚动时间窗⼝,窗⼝中计算商品交易次数CREATEVIEWpcntwindowASselectproductId,
count(productId) aspcnt,
TUMBLE_END(orderTp, INTERVAL'10'SECOND) ASwindow_endfromordersGROUPBYTUMBLE(orderTp, INTERVAL'10'SECOND),
productId;
--2、按照窗⼝结束时间分组,并按交易次数排序后排名(注意不是拿窗⼝结束时间当做时间属性的处理哈,就是基于它分个组是没问题的)
CREATEVIEWpcntrankASselectproductId,
pcnt,
window_end,
ROW_NUMBER() over (partitionbywindow_endorderbypcntDESC) asrownumfrompcntwindow;--3、求TopNselect*frompcntrankwhererownum<=2;


函数

描述

结果举例

ROW_NUMBER

为查询出来的分区内每⼀⾏记录⽣成⼀个序号,依次排序且不会重复,注意 使⽤row_number函数时必须要⽤over⼦句选择对某⼀列进⾏排序才能⽣成 序号

1,2,3,4,5

RANK

streaming模式下不⽀持 简单来说rank函数就是对查询出来的记录进⾏排名,与row_number函数不 同的是,rank函数考虑到了over⼦句中排序字段值相同的情况,如果使⽤ rank函数来⽣成序号,over⼦句中排序字段值相同的序号是⼀样的,后⾯字 段值不相同的序号将跳过相同的排名号排下⼀个,也就是相关⾏之前的排名 数加⼀

1,2,2,4,5

DENSE_RANK

dense_rank函数的功能与rank函数类似,dense_rank函数在⽣成序号时是 连续的,⽽rank函数⽣成的序号有可能不连续。dense_rank函数出现相同 排名时,将不跳过相同排名号,rank值紧接上⼀次的rank值

1,2,2,3,4




window TVF 实现



SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITIONBYwindow_start, window_end [, col_key1...]
ORDERBYcol1 [asc|desc][, col2 [asc|desc]...]) ASrownumFROMtable_name) --tablename替换为windowTVF即可WHERErownum<=N [ANDconditions]


代码实现

select*from (
selectproductId,pcnt,window_end,ROW_NUMBER() over (partitionbywindow_start,
window_endorderbypcntDESC) asrownumfrom(
SELECTproductId,window_start,window_end,count(productId) aspcntFROMTABLE(TUMBLE(TABLEorders, DESCRIPTOR(orderTp), INTERVAL'10'SECOND))
GROUPbyproductId,window_start, window_end )
)
WHERErownum<=2;


注意:

Window TopN 要求 PARTITION BY ⼦句包含Window TVF 的开始和结束列;将来,如果Window TVF 是 TUMBLE 或 HOP,我们还可以简化 PARTITION BY ⼦句以仅包含开始和结束列


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 存储 监控
FlinkSQL窗口新特性(Window TVF)
理解Window TVF Window TVF使用
FlinkSQL窗口新特性(Window TVF)
window.addEventListener注册滚动scroll事件不生效
window.addEventListener注册滚动scroll事件不生效
|
4月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
94 2
|
4月前
|
SQL 流计算
如果使用flinksql进行窗口关联操作,然后结合使用自定义表聚合函数,怎么对多表的window_start和window_end进行聚合操作?
如果使用flinksql进行窗口关联操作,然后结合使用自定义表聚合函数,怎么对多表的window_start和window_end进行聚合操作?
40 2
|
9月前
offset、client、scroll三大系列
offset、client、scroll三大系列
45 0
|
编解码
怎么解决 ie 中获取 window.screen.width 不正确?
怎么解决 ie 中获取 window.screen.width 不正确?
99 0
怎么解决 ie 中获取 window.screen.width 不正确?
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
525 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
SQL
SQL TOP PERCENT 实例
SQL TOP PERCENT 实例
55 0
|
存储 分布式计算 API
Flink Window 、Time(一)| 学习笔记
快速学习 Flink Window 、Time 。
117 0
|
存储 Java Apache
Flink Window 、Time(二)| 学习笔记
快速学习 Flink Window 、Time 。
122 0