Flink(十四)【Flink SQL(中)查询】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十四)【Flink SQL(中)查询】

前言

       接着上次写剩下的查询继续学习。

Flink SQL 查询

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

记得第二步:启动 yarn-seesion !!!

注意:我们写 SQL 的时候尽量避免关键字,比如函数名(avg、sum)!

1、分组窗口聚合

       分组窗口起始就是我们之前学过的 滑动窗口、会话窗口、滚动窗口,之所以叫它分组窗口,其实是把它的一个窗口看做一个分组。

       从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合,在这里简单做个介绍。

       直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。

       Flink SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。

分组窗口函数

描述

TUMBLE(time_attr, interval)

定义一个滚动窗口。time_attr 是时间属性,也就是你选择的作为时间语义的字段,interval 是窗口长度。

HOP(time_attr, interval, interval)

定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口的长度( 第二个 interval 参数 )以及一个滑动的步长(第一个 interval 参数 )。

SESSION(time_attr, interval)

定义一个会话时间窗口。interval 是会话的间隔。

其中,ROWTIME 代表的是事件时间语义,PROCTIME 是处理时间语义。

1)准备数据

CREATE TABLE ws (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.id.min' = '1',
  'fields.id.max' = '3',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
# 设置显示
set sql-client.execution.result-mode=tableau;

我们可以看到,一张 flink sql 的表是允许事件时间和处理时间都存在的,只是水位线只能指定一个。

2)滑动窗口示例(时间属性,窗口长度)

Flink SQL> select id,
> sum(vc) as vcSum,
> tumble_start(et,interval '5' second) as window_start,
> tumble_end(et,interval '5' second) as window_end
> from ws
> group by id,tumble(et,interval '5' second);

查询结果:

可以看到每个 key 的窗口每 5s 滚动一次。

3)滑动窗口(时间属性,滑动步长,窗口长度)

Flink SQL> select id,
> hop_start(et,interval '3' second,interval '5' second) window_start,
> hop_end(et,interval '3' second,interval '5' second) window_end,
> sum(vc) sum_vc
> from ws
> group by id,hop(et,interval '3' second,interval '5' second);

这里我们指定滑动步长为 3 窗口大小为 5,查看运行结果:

我们可以看到,相同 key 的窗口确实间隔 3s,窗口大小为 5s。

3)会话窗口(时间属性,会话间隔)

select  
id,
SESSION_START(et, INTERVAL '5' SECOND)  wstart,
SESSION_END(et, INTERVAL '5' SECOND)  wend,
sum(vc) sumVc
from ws
group by id, SESSION(et, INTERVAL '5' SECOND);

这里因为我们的数据生成器是源源不断生成数据的,而我们指定了会话间隔为 5s,也就是说只有连续 5s 收不到数据窗口才会关闭,所以我们是看不到数据结果的。

2、窗口表值函数(TVF)聚合

前面我们学习的 Group Window 在 Flink13 版本之后已经被标记为过时了,更推荐的是 TVF Window。

对比GroupWindow,TVF窗口更有效和强大。包括:

  • 提供更多的性能优化手段
  • 支持GroupingSets语法
  • 可以在window聚合中使用TopN
  • 提供累积窗口

       对于窗口表值函数(TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段 window_start 和 window_end。

FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间间隔)
)
GROUP BY [window_start,][window_end,] --可选

1)滚动窗口(tumble)

Flink SQL> select
> id,
> sum(vc) as vc_sum,
> window_start,
> window_end
> from table(    -- 这里的table是关键字 不是表名
> tumble(table ws,descriptor(et),interval '5' second)
> )
> group by window_start,window_end,id; -- 这里的window_start,window_end都是关键字

运行结果:

2)滑动窗口(hop)

注意:在 TVF 中,滑动窗口的大小必须是步长的整数倍,因为 TVF 会对滑动窗口进行一个优化:把滑动窗口按照步长大小划分为 (窗口大小/步长)个滚动窗口,这样一些需要重复计算的数据所在的滚动窗口只需要计算一次即可。

如果不是整数倍,会报错:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: HOP table function based aggregate requires size must be an integral multiple of slide, but got size xxx ms and slide xxx ms

Flink SQL> select
> id,
> sum(vc) as vc_sum,
> window_start,
> window_end
> from table(
> hop(table ws,descriptor(et),interval '2' second,interval '6' second)
> )
> group by window_start,window_end,id;

我们上面这个滑动窗口会被 TVF 划分为 6/2 也就是 3 个,其中第一个窗口和第二个窗口之间的会有两个重复的计算结果(也就是两个子滚动窗口)。

3)累积窗口(cumulate)

注意:累积窗口的窗口大小必须是步长的整数倍!

Flink SQL> select
> id,
> sum(vc) vc_sum,
> window_start,
> window_end
> from table(
> cumulate(table ws,descriptor(et),interval '2' second,interval '6' second
> ))
> group by window_start,window_end,id;

累积窗口也很常用,需要指定两个参数(窗口的最大长度,窗口的步长),比如统计一天内每个小时的网站访问量就可以开一个累积窗口(窗口的最大长度是 24h,窗口的步长是 1h),语法也很简单,只需要修改一个滑动窗口的一个关键字:cumulate。当窗口结束后会再开启一个窗口,属性和上一个窗口是一样的。

       累积窗口的底层还是一个滚动窗口,当我们定义了一个累积窗口,就相当于开了一个最大的滚动窗口,之后会根据用户指定的步长(也就是触发计算的时间)将这个窗口划分为多个窗口,这些窗口具有相同的起点和不同的终点。

4)grouping sets 多维分析

SELECT 
window_start, 
window_end, 
id , 
SUM(vc) sumVC
FROM TABLE(
  TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end,
rollup( (id) )
--  cube( (id) )
--  grouping sets( (id),()  )
;

这个多维分析用到的时候再详细了解,这里只知道Flink有这个功能就行了。

5)会话窗口

Flink 17 的 TVF 函数窗口其实是支持会话窗口的,但是现在还是实验性的功能。

3、Over 聚合

       回想我们之前 Hive 学过的开窗函数,对每个数据,我们会去计算它的开窗范围进行计算输出。比如对上一行到这一行的数据进行计算;再比如计算第一行数据到当前行。

       OVER 聚合为一系列有序行的每个输入行计算一个聚合值。与 GROUP BY 聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。

       可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。

1)语法

SELECT
  -- 聚合函数
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
  • PARTITION BY:标识了聚合窗口的聚合粒度
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合(rows 关键字),第二种为按照时间区间聚合(range 关键字)
  • 按照时间分区时,对于同一时刻的数据都会被聚合计算,但是按照行数聚合就不会。

2)案例

1. 按照时间区间聚合

统计每个传感器前 10 秒到现在收到的水位数据条数

select 
    id,
    et,
    vc,
    count(vc) over(partition by id order by et range between interval '10' second preceding and current row) cnt
from ws;

运行结果:

也可以用 WINDOW 子句来在SELECT外部单独定义一个OVER窗口,可以多次使用:

select 
    id,
    et,
    vc,
    count(vc) over w as cnt,    -- 这里的 as 可以省略
    sum(vc) over w as sum_vc
from ws
window w as(
    partition by id 
    order by et 
    range between interval '10' second preceding and current row
);

当我们有多个聚合函数并且这几个窗口函数的开窗属性是相同的时候这样写可以简化代码。

2. 按照行数聚合

统计每个传感器前5条到现在数据的平均水位:

select 
id,
et,
vc,
avg(vc) over w as avg
from ws
window w as (
partition by id
order by et
rows between 5 preceding and current row
);

运行结果:


Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
26天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
90 15
|
5天前
|
SQL 资源调度 数据库
深入探究SQL查询语句执行过程
深入探究SQL查询语句执行过程
16 2
|
5天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
13 1
|
28天前
|
SQL 运维 程序员
一个功能丰富的SQL审核查询平台
一个功能丰富的SQL审核查询平台
|
28天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
28天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
31 1
|
10天前
|
SQL
SQL: 巧妙使用CASE WHEN实现查询
文章演示了如何利用SQL中的CASE WHEN语句来有效地进行条件性聚合查询,通过具体示例展示了CASE WHEN在统计分析中的应用技巧。
25 0
|
5月前
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
529 2
|
5月前
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
577 6
|
5月前
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
下一篇
无影云桌面