Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(4)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)https://developer.aliyun.com/article/1532278

5.5、分组聚合

       SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

select COUNT(*) from source;

我们之前说过,动态表转为流,对于持续查询来说是一种更新查询,这里很明显是追加流和撤回流,而不是更新插入流。

      而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。

SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;

       这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。

       分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。

1)group聚合案例

CREATE TABLE source1 (
dim STRING,
user_id BIGINT,
price BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- 指定了水位线为 row_time 字段 - 5s
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
 
CREATE TABLE sink1 (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
 
insert into sink1
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source1
group by
dim,
-- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, 
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);

这里可以看到,我们在自动生成数据的时候,并没有指定字段的生成类型(比如是自增序列还是随机数或者字符串) ,因为只要我们指定了 max 和 min 那么这就是一个随机数;如果我们指定了 start 和 end,那么就代表这是自增序列;如果指定了 length ,就代表这是一个字符串。

这里我们也可以不用 insert into 到 sink,而是直接查询,效果是一样的

Flink SQL> select dim,
> count(*) as pv,
> sum(price) as sum_price,
> max(price) as max_price,
> min(price) as min_price,
> -- 计算 uv 数
> count(distinct user_id) as uv,
> cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
> from source1
> group by
> dim,
> -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min,
> cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);

2)多维分析

多维分析,举个例子比如我们要统计关于学生成绩的信息(最高分、最低分、平均分),我们可以从不同维度(年级、学科、性别)去统计,比如每个年级的最高分、最低分、平均分;或者不同性别的最高分... 不同年级不同学科的最高分... 或者不同年级、不同学科、不同性别的最高分...。

Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:

SELECT
  supplier_id
, rating
, product_id
, COUNT(*)
FROM (
VALUES
  ('supplier1', 'product1', 4),
  ('supplier1', 'product2', 3),
  ('supplier2', 'product3', 3),
  ('supplier2', 'product4', 4)
)
-- 供应商id、产品id、评级
AS Products(supplier_id, product_id, rating)  
GROUP BY GROUPING SETS(
  (supplier_id, product_id, rating),
  (supplier_id, product_id),
  (supplier_id, rating),
  (supplier_id),
  (product_id, rating),
  (product_id),
  (rating),
  ()
);

这段 Flink SQL 代码的主要目的是对一组产品数据进行分组聚合。

VALUES 语句:

VALUES
  ('supplier1', 'product1', 4),
  ('supplier1', 'product2', 3),
  ('supplier2', 'product3', 3),
  ('supplier2', 'product4', 4)
)

这部分定义了一个包含四行数据的虚拟表。每一行代表一个产品的供应商ID、产品ID和评级。

2. AS Products(supplier_id, product_id, rating):

AS Products(supplier_id, product_id, rating)

这部分将虚拟表重命名为 "Products",并为每一列定义了别名:supplier_id、product_id 和 rating。

3. GROUP BY GROUPING SETS:

GROUPING SETS 是 SQL 中的一种功能,它允许你指定多个分组条件,并为每个分组条件返回一个结果。这在探索多个维度聚合时非常有用。

在这个例子中,我们可以看到以下分组条件:

  • supplier_id、product_id、rating
  • supplier_id、product_id
  • supplier_id、rating
  • supplier_id
  • product_id、rating
  • product_id
  • rating
  • ()(空分组)

       这意味着,对于每个供应商ID、产品ID和评级的组合,都会进行计数。这实际上是计算每个供应商的每个产品以及每个产品的总评级的计数。同时,也计算了每个供应商的总评级、每个产品的总评级以及所有产品的总评级。最后,还计算了所有记录的总数(这是通过空分组实现的)。

SELECT 语句:

这个部分选择了上述 GROUPING SETS 中的所有列,并添加了一个 COUNT(*) 函数来计算每个分组的记录数。

所以,这段代码的输出将为给定的数据集提供以下聚合信息:

  • 每个供应商的每个产品的数量以及评级;
  • 每个供应商的每个产品的数量;
  • 每个供应商的评级数量;
  • 每个产品的评级数量;
  • 每个供应商的数量;
  • 每个产品的数量;
  • 评级的数量;
  • 所有记录的数量。

剩下的内容下一节补齐,内容太多了,编辑起来卡顿。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
91 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
8天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
28天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
36 8
|
1月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
59 4
|
1月前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
1月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
143 10
|
1月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
1月前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
35 0
|
2月前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色