Flink(十四)【Flink SQL(中)查询】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十四)【Flink SQL(中)查询】

前言

       接着上次写剩下的查询继续学习。

Flink SQL 查询

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

记得第二步:启动 yarn-seesion !!!

注意:我们写 SQL 的时候尽量避免关键字,比如函数名(avg、sum)!

1、分组窗口聚合

       分组窗口起始就是我们之前学过的 滑动窗口、会话窗口、滚动窗口,之所以叫它分组窗口,其实是把它的一个窗口看做一个分组。

       从1.13版本开始,分组窗口聚合已经标记为过时,鼓励使用更强大、更有效的窗口TVF聚合,在这里简单做个介绍。

       直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”。SQL查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。

       Flink SQL中只支持基于时间的窗口,不支持基于元素个数的窗口。

分组窗口函数

描述

TUMBLE(time_attr, interval)

定义一个滚动窗口。time_attr 是时间属性,也就是你选择的作为时间语义的字段,interval 是窗口长度。

HOP(time_attr, interval, interval)

定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口的长度( 第二个 interval 参数 )以及一个滑动的步长(第一个 interval 参数 )。

SESSION(time_attr, interval)

定义一个会话时间窗口。interval 是会话的间隔。

其中,ROWTIME 代表的是事件时间语义,PROCTIME 是处理时间语义。

1)准备数据

CREATE TABLE ws (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '5' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10',
  'fields.id.min' = '1',
  'fields.id.max' = '3',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
# 设置显示
set sql-client.execution.result-mode=tableau;

我们可以看到,一张 flink sql 的表是允许事件时间和处理时间都存在的,只是水位线只能指定一个。

2)滑动窗口示例(时间属性,窗口长度)

Flink SQL> select id,
> sum(vc) as vcSum,
> tumble_start(et,interval '5' second) as window_start,
> tumble_end(et,interval '5' second) as window_end
> from ws
> group by id,tumble(et,interval '5' second);

查询结果:

可以看到每个 key 的窗口每 5s 滚动一次。

3)滑动窗口(时间属性,滑动步长,窗口长度)

Flink SQL> select id,
> hop_start(et,interval '3' second,interval '5' second) window_start,
> hop_end(et,interval '3' second,interval '5' second) window_end,
> sum(vc) sum_vc
> from ws
> group by id,hop(et,interval '3' second,interval '5' second);

这里我们指定滑动步长为 3 窗口大小为 5,查看运行结果:

我们可以看到,相同 key 的窗口确实间隔 3s,窗口大小为 5s。

3)会话窗口(时间属性,会话间隔)

select  
id,
SESSION_START(et, INTERVAL '5' SECOND)  wstart,
SESSION_END(et, INTERVAL '5' SECOND)  wend,
sum(vc) sumVc
from ws
group by id, SESSION(et, INTERVAL '5' SECOND);

这里因为我们的数据生成器是源源不断生成数据的,而我们指定了会话间隔为 5s,也就是说只有连续 5s 收不到数据窗口才会关闭,所以我们是看不到数据结果的。

2、窗口表值函数(TVF)聚合

前面我们学习的 Group Window 在 Flink13 版本之后已经被标记为过时了,更推荐的是 TVF Window。

对比GroupWindow,TVF窗口更有效和强大。包括:

  • 提供更多的性能优化手段
  • 支持GroupingSets语法
  • 可以在window聚合中使用TopN
  • 提供累积窗口

       对于窗口表值函数(TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段 window_start 和 window_end。

FROM TABLE(
窗口类型(TABLE 表名, DESCRIPTOR(时间字段),INTERVAL 时间间隔)
)
GROUP BY [window_start,][window_end,] --可选

1)滚动窗口(tumble)

Flink SQL> select
> id,
> sum(vc) as vc_sum,
> window_start,
> window_end
> from table(    -- 这里的table是关键字 不是表名
> tumble(table ws,descriptor(et),interval '5' second)
> )
> group by window_start,window_end,id; -- 这里的window_start,window_end都是关键字

运行结果:

2)滑动窗口(hop)

注意:在 TVF 中,滑动窗口的大小必须是步长的整数倍,因为 TVF 会对滑动窗口进行一个优化:把滑动窗口按照步长大小划分为 (窗口大小/步长)个滚动窗口,这样一些需要重复计算的数据所在的滚动窗口只需要计算一次即可。

如果不是整数倍,会报错:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: HOP table function based aggregate requires size must be an integral multiple of slide, but got size xxx ms and slide xxx ms

Flink SQL> select
> id,
> sum(vc) as vc_sum,
> window_start,
> window_end
> from table(
> hop(table ws,descriptor(et),interval '2' second,interval '6' second)
> )
> group by window_start,window_end,id;

我们上面这个滑动窗口会被 TVF 划分为 6/2 也就是 3 个,其中第一个窗口和第二个窗口之间的会有两个重复的计算结果(也就是两个子滚动窗口)。

3)累积窗口(cumulate)

注意:累积窗口的窗口大小必须是步长的整数倍!

Flink SQL> select
> id,
> sum(vc) vc_sum,
> window_start,
> window_end
> from table(
> cumulate(table ws,descriptor(et),interval '2' second,interval '6' second
> ))
> group by window_start,window_end,id;

累积窗口也很常用,需要指定两个参数(窗口的最大长度,窗口的步长),比如统计一天内每个小时的网站访问量就可以开一个累积窗口(窗口的最大长度是 24h,窗口的步长是 1h),语法也很简单,只需要修改一个滑动窗口的一个关键字:cumulate。当窗口结束后会再开启一个窗口,属性和上一个窗口是一样的。

       累积窗口的底层还是一个滚动窗口,当我们定义了一个累积窗口,就相当于开了一个最大的滚动窗口,之后会根据用户指定的步长(也就是触发计算的时间)将这个窗口划分为多个窗口,这些窗口具有相同的起点和不同的终点。

4)grouping sets 多维分析

SELECT 
window_start, 
window_end, 
id , 
SUM(vc) sumVC
FROM TABLE(
  TUMBLE(TABLE ws, DESCRIPTOR(et), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end,
rollup( (id) )
--  cube( (id) )
--  grouping sets( (id),()  )
;

这个多维分析用到的时候再详细了解,这里只知道Flink有这个功能就行了。

5)会话窗口

Flink 17 的 TVF 函数窗口其实是支持会话窗口的,但是现在还是实验性的功能。

3、Over 聚合

       回想我们之前 Hive 学过的开窗函数,对每个数据,我们会去计算它的开窗范围进行计算输出。比如对上一行到这一行的数据进行计算;再比如计算第一行数据到当前行。

       OVER 聚合为一系列有序行的每个输入行计算一个聚合值。与 GROUP BY 聚合相比,OVER聚合不会将每个组的结果行数减少为一行。相反,OVER聚合为每个输入行生成一个聚合值。

       可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。

1)语法

SELECT
  -- 聚合函数
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
  • ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
  • PARTITION BY:标识了聚合窗口的聚合粒度
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合(rows 关键字),第二种为按照时间区间聚合(range 关键字)
  • 按照时间分区时,对于同一时刻的数据都会被聚合计算,但是按照行数聚合就不会。

2)案例

1. 按照时间区间聚合

统计每个传感器前 10 秒到现在收到的水位数据条数

select 
    id,
    et,
    vc,
    count(vc) over(partition by id order by et range between interval '10' second preceding and current row) cnt
from ws;

运行结果:

也可以用 WINDOW 子句来在SELECT外部单独定义一个OVER窗口,可以多次使用:

select 
    id,
    et,
    vc,
    count(vc) over w as cnt,    -- 这里的 as 可以省略
    sum(vc) over w as sum_vc
from ws
window w as(
    partition by id 
    order by et 
    range between interval '10' second preceding and current row
);

当我们有多个聚合函数并且这几个窗口函数的开窗属性是相同的时候这样写可以简化代码。

2. 按照行数聚合

统计每个传感器前5条到现在数据的平均水位:

select 
id,
et,
vc,
avg(vc) over w as avg
from ws
window w as (
partition by id
order by et
rows between 5 preceding and current row
);

运行结果:


Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9天前
|
SQL NoSQL Java
Java使用sql查询mongodb
通过使用 MongoDB Connector for BI 和 JDBC,开发者可以在 Java 中使用 SQL 语法查询 MongoDB 数据库。这种方法对于熟悉 SQL 的团队非常有帮助,能够快速实现对 MongoDB 数据的操作。同时,也需要注意到这种方法的性能和功能限制,根据具体应用场景进行选择和优化。
33 9
|
29天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
102 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
16天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
90 14
|
2月前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
39 8
|
2月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
63 4
|
2月前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
2月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
168 10
|
2月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
37 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")