FlinkSQL Window TOP N 实现

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: group windows 实现window TVF 实现

需求

每⼩时求卖得最⽕的3件商品。


分析

1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)


2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)


3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可



数据准备


叶修洁,100,3,2022-02-1716:18:20卢智宸,200,1,2022-02-1716:18:22廖天翊,300,2,2022-02-1716:18:24谢煜城,200,5,2022-02-1716:18:29程乐驹,200,1,2022-02-1716:18:33龚擎宇,200,10,2022-02-1716:18:37⽯煜城,300,1,2022-02-1716:18:42⾦楷瑞,300,1,2022-02-1716:18:46⽥烨磊,500,2,2022-02-1716:18:50杜浩宇,400,7,2022-02-1716:18:54


建表


SET'execution.target'='local';
SET'sql-client.execution.result-mode'='tableau';
CREATETABLEorders (
`user`STRING,
productIdBIGINT,
amountINT,
orderTpTIMESTAMP(0),
WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND) WITH (
'connector'='filesystem',
'path'='/home/hadoop/test/orders/orders',
'format'='csv');
descorders;
SELECT*FROMorders;


group windows 实现


这⾥直接⽤Flink SQL来实现,Table API的写法类似就不重复了,为了演示效果我们就每10秒计算⼀次Top 2的热⻔ 商品:


1. 每⼩时计算⼀次指标,所以得⽤滚动窗⼝(窗⼝⻓度1⼩时)


2. 最⽕的3件商品,显然就是求商品交易次数的TOP N,所以需要根据商品分组,因此是⼀个分组滚动窗⼝,每 个窗⼝包含⼀个商品ID每⼩时的订单数据,求交易次数,即count(*)。输出(商品ID,window_time,交易次 数)


3. 每个窗⼝输出了⼀个商品ID当前1⼩时的成交次数,接下来按照window_time分组、组内按照交易次数排序, 取交易次数最多的N个即可



CREATETABLEorders (
`user`STRING,
productIdBIGINT,
amountINT,
orderTpTIMESTAMP(0),
WATERMARKFORorderTpASorderTp-INTERVAL'1'SECOND)
--1、SQL实现滚动时间窗⼝,窗⼝中计算商品交易次数CREATEVIEWpcntwindowASselectproductId,
count(productId) aspcnt,
TUMBLE_END(orderTp, INTERVAL'10'SECOND) ASwindow_endfromordersGROUPBYTUMBLE(orderTp, INTERVAL'10'SECOND),
productId;
--2、按照窗⼝结束时间分组,并按交易次数排序后排名(注意不是拿窗⼝结束时间当做时间属性的处理哈,就是基于它分个组是没问题的)
CREATEVIEWpcntrankASselectproductId,
pcnt,
window_end,
ROW_NUMBER() over (partitionbywindow_endorderbypcntDESC) asrownumfrompcntwindow;--3、求TopNselect*frompcntrankwhererownum<=2;


函数

描述

结果举例

ROW_NUMBER

为查询出来的分区内每⼀⾏记录⽣成⼀个序号,依次排序且不会重复,注意 使⽤row_number函数时必须要⽤over⼦句选择对某⼀列进⾏排序才能⽣成 序号

1,2,3,4,5

RANK

streaming模式下不⽀持 简单来说rank函数就是对查询出来的记录进⾏排名,与row_number函数不 同的是,rank函数考虑到了over⼦句中排序字段值相同的情况,如果使⽤ rank函数来⽣成序号,over⼦句中排序字段值相同的序号是⼀样的,后⾯字 段值不相同的序号将跳过相同的排名号排下⼀个,也就是相关⾏之前的排名 数加⼀

1,2,2,4,5

DENSE_RANK

dense_rank函数的功能与rank函数类似,dense_rank函数在⽣成序号时是 连续的,⽽rank函数⽣成的序号有可能不连续。dense_rank函数出现相同 排名时,将不跳过相同排名号,rank值紧接上⼀次的rank值

1,2,2,3,4




window TVF 实现



SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITIONBYwindow_start, window_end [, col_key1...]
ORDERBYcol1 [asc|desc][, col2 [asc|desc]...]) ASrownumFROMtable_name) --tablename替换为windowTVF即可WHERErownum<=N [ANDconditions]


代码实现

select*from (
selectproductId,pcnt,window_end,ROW_NUMBER() over (partitionbywindow_start,
window_endorderbypcntDESC) asrownumfrom(
SELECTproductId,window_start,window_end,count(productId) aspcntFROMTABLE(TUMBLE(TABLEorders, DESCRIPTOR(orderTp), INTERVAL'10'SECOND))
GROUPbyproductId,window_start, window_end )
)
WHERErownum<=2;


注意:

Window TopN 要求 PARTITION BY ⼦句包含Window TVF 的开始和结束列;将来,如果Window TVF 是 TUMBLE 或 HOP,我们还可以简化 PARTITION BY ⼦句以仅包含开始和结束列


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
消息中间件 API 数据处理
Flink常见面试问题(附答案)
Apache Flink是开源的流批处理框架,提供低延迟、高吞吐的数据处理。与Hadoop不同,Flink专注于实时数据流。其核心特性包括事件时间和处理时间的概念,事件时间通过水印处理乱序事件。Flink通过检查点实现容错,支持滚动、滑动和会话窗口进行流数据处理。状态后端用于管理应用程序状态,水印用于处理延迟数据。Flink与Kafka集成能保证事件顺序,支持多种连接器如Kafka、JDBC等。其处理延迟数据、乱序事件的能力,以及Exactly-Once语义,使其在大规模数据处理中具有优势。Flink还支持表格API和DataStream API,以及多种容错和性能优化策略。
885 2
Flink常见面试问题(附答案)
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1608 27
|
11月前
|
存储 自然语言处理 Java
Elasticsearch写入优化
【10月更文挑战第3天】Elasticsearch:从写入原理谈写入优化
298 2
|
10月前
|
开发工具
如何设置单元格的填充颜色?
【10月更文挑战第22天】如何设置单元格的填充颜色?
442 2
|
SQL BI HIVE
【Hive SQL 每日一题】统计用户留存率
用户留存率是衡量产品成功的关键指标,表示用户在特定时间内持续使用产品的比例。计算公式为留存用户数除以初始用户数。例如,游戏发行后第一天有10000玩家,第七天剩5000人,第一周留存率为50%。提供的SQL代码展示了如何根据用户活动数据统计每天的留存率。需求包括计算系统上线后的每日留存率,以及从第一天开始的累计N日留存率。通过窗口函数`LAG`和`COUNT(DISTINCT user_id)`,可以有效地分析用户留存趋势。
1080 1
|
存储 SQL 关系型数据库
数据仓库、数据湖、流批一体,终于有大神讲清楚了!
数据仓库,数据湖,包括Flink社区提的流批一体,它们到底能解决什么问题?今天将由阿里云研究员从解决业务问题出发,将问题抽丝剥茧,从技术维度娓娓道来:为什么你需要数据湖或者数据仓库解决方案?它的核心难点与核心问题在哪?如果想稳定落地,系统设计该怎么做?
4933 0
|
SQL 存储
Hologres支持UPSERT操作
Hologres支持UPSERT操作
250 2
|
SQL 大数据 HIVE
大数据技术之Hive SQL题库-高级
大数据技术之Hive SQL题库-高级
672 0
|
存储 消息中间件 缓存
flink面试问题总结(1)
flink面试问题总结(1)