Flink(十四)【Flink SQL(中)查询】(2)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(1)https://developer.aliyun.com/article/1532319

4、特殊语法 - TopN

       目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法,是固定写法,特殊支持的over用法。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-- 不能指定行范围!!! 只能从最早到当前
FROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER() :标识 TopN 排序子句
  • PARTITION BY col1[, col2...] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key ,就是根据需求中的搜索关键词(key)做为分区
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序,可以不是时间字段,也可以降序(TopN特殊支持)
  • WHERE rownum <= N :这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个TopN 的查询,其中 N 代表 TopN 的条目数
  • [AND conditions] :其他的限制条件也可以加上

2)案例

取每个传感器最高的3个水位值

select 
id,
et,
vc,
rownum
from(
    select id,
    vc,
    row_number() over (partition by id order by vc desc ) as rownum
    from ws
)where rownum <= 3;

运行结果:

5、特殊语法 - Deduplication 去重

       这种语法也是借助于 over 窗口,语法和 topN 相似,唯一不同的是它要求排序的字段必须是时间属性列,同样可以降序,不可以是其它非时间属性的列。去重的原理是按时间升序或降序然后取第一条,所以升序和降序很影响性能,我们一般建议升序,因为这样当来了一条相同的数据时,由于按照时间升序,新数据的时间小,不会取代已有的数据;但是如果是降序的话,新数据的时间比较晚会被放到第一条数据的位置,就要发生更新操作;所以我们一般使用升序。

       在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。可以从web UI 看出是翻译成哪种算子(TopN 的算子叫 Rank、Deduplication 的算子叫 Deduplication、普通聚合(比如sum、avg)的算子叫 OverAggregate)。

如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1;

2)案例

对每个传感器的水位值去重

select
id,
et,
vc,
rownum
from (
    select 
    id,
    et,
    vc,
    row_number() over(partition by id,vc order by et asc) as rownum
    from ws 
)where rownum = 1;

运行结果:

我们发现所有的结果的操作都是 +I  说明没有重复数据,都是新插入的数据。

6、联结查询

       在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。

       在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。

注意:什么 join 就以谁为主,left join 就是以左边的流为主,右边的流来了只能老老实实待着,待在状态里没有资格输出。right join 同理。

6.1、常规联结查询

       常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。

       与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。

Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

  • Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
  • Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
  • Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]

Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大
  • join 的时候,左右两条流中的数据都会先存到状态中去,先来的数据会去对方的状态中去查找有没有可以匹配上的,没有就匹配为 null。

我们再准备一张表用于join:

CREATE TABLE ws1 (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.id.min' = '3',
  'fields.id.max' = '5',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
1)等值内联结(INNER Equi-JOIN

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。

select ws.id,ws1.id from ws join ws1 on ws.id=ws1.id;

我们可以看到,inner join 必须等待两条流都到齐后才会开始 join,否则就在状态里等着。

2)等值外联结(OUTER Equi-JOIN

与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

具体用法如下:

left join

select ws.id,ws1.id,ws.vc,ws1.vc from ws left join ws1 on ws.vc=ws1.vc;

运行结果:

right join

select ws.id,ws.vc,ws1.id,ws1.vc from ws right join ws1 on ws.vc=ws1.vc;

运行结果:

full join

6.2、间隔联结查询

       我们曾经学习过 DataStream API 中的双流 Join ,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。

       间隔联结(Interval Join)返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

  • 两表的联结

       间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

  • 联结条件

       联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。

  • 时间间隔限制

       我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

测试一下:

select * from ws,ws1 where ws.id=ws1.id and ws.et between ws1.et - interval '2' second and ws1.et + interval '2' second;

查看 Web UI :

6.3、维表联结查询

       Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。

维度表(Dimension Table)是数据仓库中的一种表结构,通常包含有关业务过程的描述性属性。它可以提供多个角度来理解和分析数据。维度表的特点是宽而扁平,每一行代表一个实体,每一列代表一个属性。

比如我们只知道商品的 id ,不知道商品的名字,那么我们的Flink流就需要去 join 通过mysql 的连接器去获取mysql中的数据。

       上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段

表A
-- 固定写法 没有为什么
JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名
ON xx.字段=别名.字段

比如维表在mysql,维表join的写法如下:

CREATE TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
  'table-name' = 'customers'
);
 
-- order表每来一条数据,都会去mysql的customers表查找维度数据
 
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;    --连接条件

后面我们学习连接器的时候会深入学习。

Flink(十四)【Flink SQL(中)查询】(3)https://developer.aliyun.com/article/1532324

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1093 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
506 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
7月前
|
SQL 监控 关系型数据库
一键开启百倍加速!RDS DuckDB 黑科技让SQL查询速度最高提升200倍
RDS MySQL DuckDB分析实例结合事务处理与实时分析能力,显著提升SQL查询性能,最高可达200倍,兼容MySQL语法,无需额外学习成本。
|
7月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
7月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
8月前
|
SQL XML Java
通过MyBatis的XML配置实现灵活的动态SQL查询
总结而言,通过MyBatis的XML配置实现灵活的动态SQL查询,可以让开发者以声明式的方式构建SQL语句,既保证了SQL操作的灵活性,又简化了代码的复杂度。这种方式可以显著提高数据库操作的效率和代码的可维护性。
491 18
|
6月前
|
SQL 关系型数据库 MySQL
(SQL)SQL语言中的查询语句整理
查询语句在sql中占了挺大一部分篇幅,因为在数据库中使用查询语句的次数远多于更新与删除命令。而查询语句比起其他语句要更加的复杂,可因为sql是数据库不可或缺的一部分,所以即使不懂,也必须得弄懂,以上。
366 0
|
8月前
|
SQL 人工智能 数据库
【三桥君】如何正确使用SQL查询语句:避免常见错误?
三桥君解析了SQL查询中的常见错误和正确用法。AI产品专家三桥君通过三个典型案例:1)属性重复比较错误,应使用IN而非AND;2)WHERE子句中非法使用聚合函数的错误,应改用HAVING;3)正确的分组查询示例。三桥君还介绍了学生、课程和选课三个关系模式,并分析了SQL查询中的属性比较、聚合函数使用和分组查询等关键概念。最后通过实战练习帮助读者巩固知识,强调掌握这些技巧对提升数据库查询效率的重要性。
275 0
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1335 2
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1919 6