Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)https://developer.aliyun.com/article/1532278
5.5、分组聚合
SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:
select COUNT(*) from source;
我们之前说过,动态表转为流,对于持续查询来说是一种更新查询,这里很明显是追加流和撤回流,而不是更新插入流。
而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。
SELECT vc, COUNT(*) as cnt FROM source GROUP BY vc;
这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。
分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。
1)group聚合案例
CREATE TABLE source1 ( dim STRING, user_id BIGINT, price BIGINT, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), -- 指定了水位线为 row_time 字段 - 5s WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.dim.length' = '1', 'fields.user_id.min' = '1', 'fields.user_id.max' = '100000', 'fields.price.min' = '1', 'fields.price.max' = '100000' ); CREATE TABLE sink1 ( dim STRING, pv BIGINT, sum_price BIGINT, max_price BIGINT, min_price BIGINT, uv BIGINT, window_start bigint ) WITH ( 'connector' = 'print' ); insert into sink1 select dim, count(*) as pv, sum(price) as sum_price, max(price) as max_price, min(price) as min_price, -- 计算 uv 数 count(distinct user_id) as uv, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start from source1 group by dim, -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);
这里可以看到,我们在自动生成数据的时候,并没有指定字段的生成类型(比如是自增序列还是随机数或者字符串) ,因为只要我们指定了 max 和 min 那么这就是一个随机数;如果我们指定了 start 和 end,那么就代表这是自增序列;如果指定了 length ,就代表这是一个字符串。
这里我们也可以不用 insert into 到 sink,而是直接查询,效果是一样的
Flink SQL> select dim, > count(*) as pv, > sum(price) as sum_price, > max(price) as max_price, > min(price) as min_price, > -- 计算 uv 数 > count(distinct user_id) as uv, > cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start > from source1 > group by > dim, > -- UNIX_TIMESTAMP得到秒的时间戳,将秒级别时间戳 / 60 转化为 1min, > cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint);
2)多维分析
多维分析,举个例子比如我们要统计关于学生成绩的信息(最高分、最低分、平均分),我们可以从不同维度(年级、学科、性别)去统计,比如每个年级的最高分、最低分、平均分;或者不同性别的最高分... 不同年级不同学科的最高分... 或者不同年级、不同学科、不同性别的最高分...。
Group 聚合也支持 Grouping sets 、Rollup 、Cube,如下案例是Grouping sets:
SELECT supplier_id , rating , product_id , COUNT(*) FROM ( VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4) ) -- 供应商id、产品id、评级 AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SETS( (supplier_id, product_id, rating), (supplier_id, product_id), (supplier_id, rating), (supplier_id), (product_id, rating), (product_id), (rating), () );
这段 Flink SQL 代码的主要目的是对一组产品数据进行分组聚合。
VALUES 语句:
VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4) )
这部分定义了一个包含四行数据的虚拟表。每一行代表一个产品的供应商ID、产品ID和评级。
2. AS Products(supplier_id, product_id, rating):
AS Products(supplier_id, product_id, rating)
这部分将虚拟表重命名为 "Products",并为每一列定义了别名:supplier_id、product_id 和 rating。
3. GROUP BY GROUPING SETS:
GROUPING SETS 是 SQL 中的一种功能,它允许你指定多个分组条件,并为每个分组条件返回一个结果。这在探索多个维度聚合时非常有用。
在这个例子中,我们可以看到以下分组条件:
- supplier_id、product_id、rating
- supplier_id、product_id
- supplier_id、rating
- supplier_id
- product_id、rating
- product_id
- rating
- ()(空分组)
这意味着,对于每个供应商ID、产品ID和评级的组合,都会进行计数。这实际上是计算每个供应商的每个产品以及每个产品的总评级的计数。同时,也计算了每个供应商的总评级、每个产品的总评级以及所有产品的总评级。最后,还计算了所有记录的总数(这是通过空分组实现的)。
SELECT 语句:
这个部分选择了上述 GROUPING SETS 中的所有列,并添加了一个 COUNT(*) 函数来计算每个分组的记录数。
所以,这段代码的输出将为给定的数据集提供以下聚合信息:
- 每个供应商的每个产品的数量以及评级;
- 每个供应商的每个产品的数量;
- 每个供应商的评级数量;
- 每个产品的评级数量;
- 每个供应商的数量;
- 每个产品的数量;
- 评级的数量;
- 所有记录的数量。
剩下的内容下一节补齐,内容太多了,编辑起来卡顿。