Fink SQL 实践之OVER窗口

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

Fink SQL 实践之OVER窗口
问题场景
Flink SQL 是一种使用 SQL 语义设计的开发语言,用它解决具体业务需求是一种全新体验,类似于从过程式编程到函数式编程的转变一样,需要一个不断学习和实践的过程。在看完了 Flink 官方文档中 SQL 部分 ,以及官方提供的 SQL Training 后,觉得自己装备了必杀技准备横扫需求了,这时先来一个简单的营销需求:实时计算今天用户加页面维度的浏览次数,即实时输出PV,下游根据某些规则比如今天用户浏览A页面达到N次触发某种动作。当我使用 group by user 并输出到 kafka 中时,发生异常: AppendStreamTableSink requires that Table has only insert changes,这是因为不带时间窗口的分组聚合不支持流式输出,依赖 Flink Dynamic Tables 的实现原理。修改后加上时间窗口但是不支持实时触发,只在窗口结束时输出一次结果,总之仔细翻了个遍,发现上面资料中都没有类似场景,不会这么简单都实现不了吧?

首先使用标准 SQL 中的 group by 对数据进行分组时,每个分组只能输出一行,其中可以有聚合结果和 group by 的列,如果没有开启 ONLY_FULL_GROUP_BY 模式(MySQL 5.7 开始默认开启),还可以输出任意一行值的非 group by 的列,而 Flink 则是始终开启,不支持非聚合列输出。反过来如果我们想要在聚合计算后输出所有明细,即每行明细带上聚合结果,这时只能使用 OVER窗口和它的分组 Partition(还有一个思路是聚合结果和明细再关联查询)。

此外在 Flink 中提供了一系列时间窗口函数,如滚动窗口 Tumble、滑动窗口 Hop、会话窗口 Session 等,可以将时间窗口作为 group by 分组项之一,但是正如前面所说它们都只能在窗口结束时触发计算输出,无法满足实时触发需求,这点也只能通过 OVER 窗口实现。

OVER 窗口介绍
OVER 窗口是传统数据库的窗口函数,它定义了一行记录向前多少行或向后多少行作为一个窗口,以此范围进行分组 Partion 和聚合计算。初次接触 OVER 窗口觉得不好理解,但实际上它是 SQL 标准支持的特性,包括 Oracle、MySQL、PostgreSQL 等主流数据库都已支持,下面是来自 PostgreSQL 文档 3.5. Window Functions 中的介绍:

A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. However, window functions do not cause rows to become grouped into a single output row like non-window aggregate calls would. Instead, the rows retain their separate identities. Behind the scenes, the window function is able to access more than just the current row of the query result.

其中有具体 SQL 例子辅助理解,此外在维基百科有个更简短介绍 SQL window function ,有了以上基础后再来看 Flink SQL 中的 OVER 窗口,官方文档对 OVER 窗口只有几行描述,SQL Training 中甚至没有涉及,这里我们可以参考阿里云实时计算文档 OVER窗口 ,为了解释后面例子,这里我们只关注其中 Bounded RANGE OVER 类型:具有相同时间值(时间戳毫秒数)的元素视为同一计算行,它对应一个向前有限时间范围的行到该行的一个窗口(因为每一个新到的计算行是该窗口最后一个,以后的还没发生当然无法计算),每一个窗口都会触发一次计算和输出。

OVER 窗口应用示例
首先通过 DDL 定义源数据表和结果表,如下输入是用户行为消息,输出到计算结果消息。

CREATE TABLE user_action (

`user_id` VARCHAR,
`page_id` VARCHAR,
`action_type` VARCHAR,
`event_time` TIMESTAMP,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',
'connector.topic' = 'user_action',
'connector.version' = '0.11',
'connector.properties.0.key' = 'bootstrap.servers',
'connector.properties.0.value' = 'xxx:9092',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'...' = '...'

);

CREATE TABLE agg_result (

`user_id` VARCHAR,
`page_id` VARCHAR,
`result_type` VARCHAR,
`result_value` BIGINT

) WITH (

'connector.type' = 'kafka',
'connector.topic' = 'agg_result',
'...' = '...'

);
场景一,实时触发的最近2小时用户+页面维度的点击量,注意窗口是向前2小时,类似于实时触发的滑动窗口。

insert into

agg_result

select

user_id,
page_id,
'click-type1' as result_type
count(1) OVER (
    PARTITION BY user_id, page_id
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW
) as result_value

from

user_action

where

action_type = 'click'

场景二,实时触发的当天用户+页面维度的浏览量,这就是开篇问题解法,其中多了一个日期维度分组条件,这样就做到输出结果从滑动时间转为固定时间(根据时间区间分组),因为 WATERMARK 机制,今天并不会有昨天数据到来(如果有都被自动抛弃),因此只会输出今天的分组结果。

insert into

agg_result

select

user_id,
page_id,
'view-type1' as result_type
count(1) OVER (
    PARTITION BY user_id, page_id, DATE_FORMAT(event_time, 'yyyyMMdd')
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
) as result_value

from

user_action

where

action_type = 'view'

场景三,实时触发的当天用户+页面点击率 CTR(Click-Through-Rate),这相比前面增加了多个 OVER 聚合计算,可以将窗口定义写在最后。注意示例中缺少了类型转换,因为除法结果是 decimal,也缺少精度处理函数 ROUND。

insert into

agg_result

select

user_id,
page_id,
'ctr-type1' as result_type,
sum(
    case
        when action_type = 'click' then 1 else 0
    end
) OVER w 
/ 
if(
    sum(
        case
            when action_type = 'view' then 1 else 0
        end
    ) OVER w = 0,
    1,
    sum(
        case
            when action_type = 'view' then 1 else 0
        end
    ) OVER w
)
as result_value

from

user_action

where

1 = 1 

WINDOW w AS (

    PARTITION BY user_id, page_id, DATE_FORMAT(event_time,'yyyyMMdd')
    ORDER BY event_time 
    RANGE BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW
)

此外,OVER 窗口聚合还可以支持查询子句、关联查询、UNION ALL 等组合,并可以实现对关联出来的列进行聚合等复杂情况。

OVER 窗口问题和优化
在底层实现中,所有细分 OVER 窗口的数据都是共享的,只存一份,这点不像滑动窗口会保存多份窗口数据。但是 OVER 窗口会把所有数据明细存在状态后端中(内存、RocksDB 或 HDFS),每一次窗口计算后会清除过期数据。因此如果向前窗口时间较大,或数据明细过多,可能会占用大量内存,即使通过 RocksDB 存在磁盘上,也有因为磁盘访问慢导致性能下降进而产生反压问题。在实现源码 RowTimeRangeBoundedPrecedingFunction 可以看到虽然每次窗口计算时新增聚合值和减少过期聚合值是增量式的,不用遍历全部窗口明细,但是为了计算过期数据,即超过 PRECEDING 的数据,仍然需要把存储的那些时间戳全部拿出来遍历,判断是否过期,以及是否要减少聚合值。我们尝试了通过数据有序性减少查询操作,但是效果并不明显,目前主要是配置调优和加大任务分片数进行优化。

原文地址https://www.cnblogs.com/pyx0/p/flink-sql-over.html

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
SQL Java 关系型数据库
在 RDB 上跑 SQL------SPL 轻量级多源混算实践 1
SPL 支持通过 JDBC 连接 RDB,可动态生成 SQL 并传参,适用于 Java 与 SQL 结合的各类场景。本文以 MySQL 为例,演示如何配置数据库连接、编写 SPL 脚本查询 2024 年订单数据,并支持参数过滤和 SQL 混合计算。脚本可在 IDE 直接执行或集成至 Java 应用调用。
|
30天前
|
SQL 关系型数据库 Java
SQL 移植--SPL 轻量级多源混算实践 7
不同数据库的 SQL 语法存在差异,尤其是函数写法不同,导致 SQL 移植困难。SPL 提供 sqltranslate 函数,可将标准 SQL 转换为特定数据库语法,实现 SQL 语句在不同数据库间的无缝迁移,支持多种数据库函数映射与自定义扩展。
|
5月前
|
SQL 存储 关系型数据库
SQL优化策略与实践:组合索引与最左前缀原则详解
本文介绍了SQL优化的多种方式,包括优化查询语句(避免使用SELECT *、减少数据处理量)、使用索引(创建合适索引类型)、查询缓存、优化表结构、使用存储过程和触发器、批量处理以及分析和监控数据库性能。同时,文章详细讲解了组合索引的概念及其最左前缀原则,即MySQL从索引的最左列开始匹配条件,若跳过最左列,则索引失效。通过示例代码,展示了如何在实际场景中应用这些优化策略,以提高数据库查询效率和系统响应速度。
181 10
|
5月前
|
SQL 安全 关系型数据库
SQL注入之万能密码:原理、实践与防御全解析
本文深入解析了“万能密码”攻击的运行机制及其危险性,通过实例展示了SQL注入的基本原理与变种形式。文章还提供了企业级防御方案,包括参数化查询、输入验证、权限控制及WAF规则配置等深度防御策略。同时,探讨了二阶注入和布尔盲注等新型攻击方式,并给出开发者自查清单。最后强调安全防护需持续改进,无绝对安全,建议使用成熟ORM框架并定期审计。技术内容仅供学习参考,严禁非法用途。
806 0
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
371 15
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
163 2
|
11月前
|
SQL 关系型数据库 MySQL
Go语言项目高效对接SQL数据库:实践技巧与方法
在Go语言项目中,与SQL数据库进行对接是一项基础且重要的任务
223 11
|
11月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
1298 4
|
11月前
|
SQL 关系型数据库 数据库
SQL数据库:核心原理与应用实践
随着信息技术的飞速发展,数据库管理系统已成为各类组织和企业中不可或缺的核心组件。在众多数据库管理系统中,SQL(结构化查询语言)数据库以其强大的数据管理能力和灵活性,广泛应用于各类业务场景。本文将深入探讨SQL数据库的基本原理、核心特性以及实际应用。一、SQL数据库概述SQL数据库是一种关系型数据库
363 5
|
11月前
|
SQL 开发框架 .NET
ASP连接SQL数据库:从基础到实践
随着互联网技术的快速发展,数据库与应用程序之间的连接成为了软件开发中的一项关键技术。ASP(ActiveServerPages)是一种在服务器端执行的脚本环境,它能够生成动态的网页内容。而SQL数据库则是一种关系型数据库管理系统,广泛应用于各类网站和应用程序的数据存储和管理。本文将详细介绍如何使用A
211 3

热门文章

最新文章