Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(三)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇

(2)滑动窗口(HOP)

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在SQL中通过调用HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

(3)累积窗口(CUMULATE)

640.png

累积窗口是窗口TVF中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据 在SQL中可以用CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

注意第三个参数为步长step,第四个参数则是最大窗口长度。上面所有的语句只是定义了窗口,类似于DataStream API中的窗口分配器;在SQL中窗口的完整调用,还需要配合聚合操作和其它操作。

五、聚合(Aggregation)查询

Flink 中的SQL是流处理与标准SQL结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及SQL原生的聚合查询方式。

5.1 分组聚合

SQL中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

Table eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable");

而更多的情况下,我们可以通过GROUP BY子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。例如之前我们举的例子,可以按照用户名进行分组,统计每个用户点击url的次数:

SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user

这种聚合方式,就叫作“分组聚合”(group aggregation)。想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成DataStream打印输出,需要调用toChangelogStream()。

分组聚合既是SQL原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义。

5.2 窗口聚合

在Flink的Table API和SQL中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过GROUP BY子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组key的一部分定义出来。

在Flink 1.12版本之前,是直接把窗口自身作为分组key放在GROUP BY之后的,所以也叫“分组窗口聚合”;而1.13版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一个表,所以窗口会出现在FROM后面,GROUP BY后面的则是窗口新增的字段window_start和window_end。例如:

Table result = tableEnv.sqlQuery(
                        "SELECT " +
                            "user, " +
                            "window_end AS endT, " +
                            "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                                  "TUMBLE( TABLE EventTable, " +
                                  "DESCRIPTOR(ts), " +
                                  "INTERVAL '1' HOUR)) " +
                        "GROUP BY user, window_start, window_end "
                );

Flink SQL目前提供了滚动窗口TUMBLE()、滑动窗口HOP()和累积窗口(CUMULATE)三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:

public class CumulateWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperator<Event> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1",  25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );
// 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")  
        );
// 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);
// 设置累积窗口,执行SQL统计查询
        Table result = tableEnv
                .sqlQuery(
                        "SELECT " +
                            "user, " +
                            "window_end AS endT, " +
                            "COUNT(url) AS cnt " +
                        "FROM TABLE( " +
                            "CUMULATE( TABLE EventTable, " +    // 定义累积窗口
                                "DESCRIPTOR(ts), " +
                                "INTERVAL '30' MINUTE, " +
                                "INTERVAL '1' HOUR)) " +
                        "GROUP BY user, window_start, window_end "
                );
        tableEnv.toDataStream(result).print();
        env.execute();
    }
}

基于窗口的聚合,是流处理中聚合统计的一个特色,也是与标准SQL最大的不同之处。在实际项目中,很多统计指标其实都是基于时间窗口来进行计算的,所以窗口聚合是Flink SQL中非常重要的功能;基于窗口TVF的聚合未来也会有更多功能的扩展支持,比如窗口TOP-N、会话窗口、窗口联结等等。

5.3 开窗(Over)聚合

在标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫作“OVER聚合”(Over Aggregation)。基本语法如下:

SELECT
  <聚合函数> OVER (
    [PARTITION BY <字段1>[, <字段2>, ...]]
    ORDER BY <时间属性字段>
    <开窗范围>),
  ...
FROM ...

这里OVER关键字前面是一个聚合函数,它会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:

PARTITION BY(可选)

用来指定分区的键(key),类似于GROUP BY的分组,这部分是可选的;

ORDER BY

在OVER子句中必须用ORDER BY明确地指出数据基于那个字段排序。在Flink的流处理中,目前只支持按照时间属性的升序排列,所以这里ORDER BY后面的字段必须是定义好的时间属性。

开窗范围

由BETWEEN <下界> AND <上界> 定义,也就是“从下界到上界”的范围。目前支持的上界只能是CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:

BETWEEN ... PRECEDING AND CURRENT ROW

开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。

范围间隔

范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前1小时的数据:

RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW

行间隔

行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的5行数据(最终聚合会包括当前行,所以一共6条数据):

ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

下面是一个具体示例:

SELECT user, ts,
        COUNT(url) OVER (
            PARTITION BY user
            ORDER BY ts
            RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
        ) AS cnt
FROM EventTable

开窗聚合与窗口聚合(窗口TVF聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在Table API中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在SQL中,也可以用WINDOW子句来在SELECT外部单独定义一个OVER窗口:

SELECT user, ts,
  COUNT(url) OVER w AS cnt,
  MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
  PARTITION BY user
  ORDER BY ts
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

上面的SQL中定义了一个选取之前2行数据的OVER窗口,并重命名为w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。

5.4 应用实例 —— TOP-N

目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法。下面是一个具体案例的代码实现。由于用户访问事件Event中没有商品相关信息,因此我们统计每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。

public class WindowTopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
// 读取数据源,并分配时间戳、生成水位线
        SingleOutputStreamOperator<Event> eventStream = env
                .fromElements(
                        new Event("Alice", "./home", 1000L),
                        new Event("Bob", "./cart", 1000L),
                        new Event("Alice", "./prod?id=1",  25 * 60 * 1000L),
                        new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),
                        new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
                        new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
                        new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );
        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 将数据流转换成表,并指定时间属性
        Table eventTable = tableEnv.fromDataStream(
                eventStream,
                $("user"),
                $("url"),
                $("timestamp").rowtime().as("ts")  
// 将timestamp指定为事件时间,并命名为ts
        );
        // 为方便在SQL中引用,在环境中注册表EventTable
        tableEnv.createTemporaryView("EventTable", eventTable);
        // 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表
        String subQuery =
                "SELECT window_start, window_end, user, COUNT(url) as cnt " +
                "FROM TABLE ( " +
                    "TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )) " +
                "GROUP BY window_start, window_end, user ";
        // 定义TOP-N的外层查询
        String topNQuery =
                "SELECT * " +
                "FROM (" +
                    "SELECT *, " +
                        "ROW_NUMBER() OVER ( " +
                            "PARTITION BY window_start, window_end " +
                            "ORDER BY cnt desc " +
                        ") AS row_num " +
                    "FROM (" + subQuery + ")) " +
                "WHERE row_num <= 2";
        // 执行SQL得到结果表
        Table result = tableEnv.sqlQuery(topNQuery);
        tableEnv.toDataStream(result).print();
        env.execute();
    }
}

六、联结(Join)查询

在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。


6.1 常规联结查询

常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件”作为联结条件,也就是关键字ON后面必须是判断两表中字段相等的逻辑表达式。

1. 等值内联结(INNER Equi-JOIN)

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。例如:

SELECT *
FROM Order
INNER JOIN Product
ON Order.product_id = Product.id

2. 等值外联结(OUTER Equi-JOIN)

与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。具体用法如下:

SELECT *
FROM Order
LEFT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
RIGHT JOIN Product
ON Order.product_id = Product.id
SELECT *
FROM Order
FULL OUTER JOIN Product
ON Order.product_id = Product.id

这部分知识与标准SQL中是完全一样的。

6.2 间隔联结查询

我们曾经学习过DataStream API中的双流Join,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

两表的联结

间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

联结条件

联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。

时间间隔限制

我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用ltime和rtime表示左右表中的时间字段:

(1)ltime = rtime
(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

例如,我们现在除了订单表Order外,还有一个“发货表”Shipment,要求在收到订单后四个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合并在一起返回。

SELECT *
FROM Order o, Shipment s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
SQL API 数据处理
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
本文整理自阿里云开源大数据平台吕宴全关于新一代实时数据集成框架 Flink CDC 3.0 的核心技术架构解析。
779 0
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
|
2月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
70 1
|
2月前
|
SQL 关系型数据库 MySQL
Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
【2月更文挑战第18天】Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
174 2
|
2月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
212 2
|
2月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104472 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
3天前
|
API 持续交付 开发者
构建高效微服务架构:后端开发的新视角
【5月更文挑战第8天】 随着现代软件开发的演变,微服务架构已经成为了企业追求敏捷、可扩展和灵活部署的重要解决方案。本文将深入探讨如何构建一个高效的微服务架构,包括关键的设计原则、技术栈选择以及持续集成与部署的最佳实践。我们还将讨论微服务带来的挑战,如数据一致性、服务发现和网络延迟,并提出相应的解决策略。通过本文,后端开发者将获得构建和维护微服务系统所需的深度知识,并了解如何在不断变化的技术环境中保持系统的健壮性和可维护性。
35 8
|
1天前
|
监控 持续交付 Docker
使用Docker进行微服务架构的最佳实践
【5月更文挑战第10天】本文探讨了使用Docker实施微服务架构的最佳实践。首先,理解微服务架构是拆分小型独立服务的模式,借助Docker实现快速部署、高可移植性和环境一致性。Docker的优势在于服务扩展、容器编排、自动化构建与部署。最佳实践包括:定义清晰服务边界,使用Dockerfile和Docker Compose自动化构建,利用Docker Swarm或Kubernetes编排,实施服务发现和负载均衡,监控与日志记录,以及持续集成和持续部署。Docker虽重要,但需与其他技术结合以确保系统整体稳定性。
|
1天前
|
缓存 负载均衡 API
微服务架构下的API网关性能优化实践
【5月更文挑战第10天】在微服务架构中,API网关作为前端和后端服务之间的关键枢纽,其性能直接影响到整个系统的响应速度和稳定性。本文将探讨在高并发场景下,如何通过缓存策略、负载均衡、异步处理等技术手段对API网关进行性能优化,以确保用户体验和服务的可靠性。
|
1天前
|
监控 持续交付 开发者
构建高效微服务架构:后端开发的新范式
【5月更文挑战第10天】在现代软件开发领域,微服务架构已经成为一种流行的设计模式,它通过将大型应用程序拆分为一组小型、独立和松散耦合的服务来提供更高的可伸缩性和灵活性。本文深入探讨了微服务架构的设计理念、实施步骤以及面临的挑战,并提出了一套实用的策略和最佳实践,帮助后端开发者构建和维护高效的微服务系统。

热门文章

最新文章