Flink(十四)【Flink SQL(中)查询】(2)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(1)https://developer.aliyun.com/article/1532319

4、特殊语法 - TopN

       目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法,是固定写法,特殊支持的over用法。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-- 不能指定行范围!!! 只能从最早到当前
FROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER() :标识 TopN 排序子句
  • PARTITION BY col1[, col2...] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key ,就是根据需求中的搜索关键词(key)做为分区
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序,可以不是时间字段,也可以降序(TopN特殊支持)
  • WHERE rownum <= N :这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个TopN 的查询,其中 N 代表 TopN 的条目数
  • [AND conditions] :其他的限制条件也可以加上

2)案例

取每个传感器最高的3个水位值

select 
id,
et,
vc,
rownum
from(
    select id,
    vc,
    row_number() over (partition by id order by vc desc ) as rownum
    from ws
)where rownum <= 3;

运行结果:

5、特殊语法 - Deduplication 去重

       这种语法也是借助于 over 窗口,语法和 topN 相似,唯一不同的是它要求排序的字段必须是时间属性列,同样可以降序,不可以是其它非时间属性的列。去重的原理是按时间升序或降序然后取第一条,所以升序和降序很影响性能,我们一般建议升序,因为这样当来了一条相同的数据时,由于按照时间升序,新数据的时间小,不会取代已有的数据;但是如果是降序的话,新数据的时间比较晚会被放到第一条数据的位置,就要发生更新操作;所以我们一般使用升序。

       在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。可以从web UI 看出是翻译成哪种算子(TopN 的算子叫 Rank、Deduplication 的算子叫 Deduplication、普通聚合(比如sum、avg)的算子叫 OverAggregate)。

如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1;

2)案例

对每个传感器的水位值去重

select
id,
et,
vc,
rownum
from (
    select 
    id,
    et,
    vc,
    row_number() over(partition by id,vc order by et asc) as rownum
    from ws 
)where rownum = 1;

运行结果:

我们发现所有的结果的操作都是 +I  说明没有重复数据,都是新插入的数据。

6、联结查询

       在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。

       在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。

注意:什么 join 就以谁为主,left join 就是以左边的流为主,右边的流来了只能老老实实待着,待在状态里没有资格输出。right join 同理。

6.1、常规联结查询

       常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。

       与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。

Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

  • Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
  • Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
  • Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]

Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大
  • join 的时候,左右两条流中的数据都会先存到状态中去,先来的数据会去对方的状态中去查找有没有可以匹配上的,没有就匹配为 null。

我们再准备一张表用于join:

CREATE TABLE ws1 (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.id.min' = '3',
  'fields.id.max' = '5',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
1)等值内联结(INNER Equi-JOIN

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。

select ws.id,ws1.id from ws join ws1 on ws.id=ws1.id;

我们可以看到,inner join 必须等待两条流都到齐后才会开始 join,否则就在状态里等着。

2)等值外联结(OUTER Equi-JOIN

与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

具体用法如下:

left join

select ws.id,ws1.id,ws.vc,ws1.vc from ws left join ws1 on ws.vc=ws1.vc;

运行结果:

right join

select ws.id,ws.vc,ws1.id,ws1.vc from ws right join ws1 on ws.vc=ws1.vc;

运行结果:

full join

6.2、间隔联结查询

       我们曾经学习过 DataStream API 中的双流 Join ,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。

       间隔联结(Interval Join)返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

  • 两表的联结

       间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

  • 联结条件

       联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。

  • 时间间隔限制

       我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

测试一下:

select * from ws,ws1 where ws.id=ws1.id and ws.et between ws1.et - interval '2' second and ws1.et + interval '2' second;

查看 Web UI :

6.3、维表联结查询

       Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。

维度表(Dimension Table)是数据仓库中的一种表结构,通常包含有关业务过程的描述性属性。它可以提供多个角度来理解和分析数据。维度表的特点是宽而扁平,每一行代表一个实体,每一列代表一个属性。

比如我们只知道商品的 id ,不知道商品的名字,那么我们的Flink流就需要去 join 通过mysql 的连接器去获取mysql中的数据。

       上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段

表A
-- 固定写法 没有为什么
JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名
ON xx.字段=别名.字段

比如维表在mysql,维表join的写法如下:

CREATE TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
  'table-name' = 'customers'
);
 
-- order表每来一条数据,都会去mysql的customers表查找维度数据
 
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;    --连接条件

后面我们学习连接器的时候会深入学习。

Flink(十四)【Flink SQL(中)查询】(3)https://developer.aliyun.com/article/1532324

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 数据挖掘 数据库
第三篇:高级 SQL 查询与多表操作
本文深入讲解高级SQL查询技巧,涵盖多表JOIN操作、聚合函数、分组查询、子查询及视图索引等内容。适合已掌握基础SQL的学习者,通过实例解析INNER/LEFT/RIGHT/FULL JOIN用法,以及COUNT/SUM/AVG等聚合函数的应用。同时探讨复杂WHERE条件、子查询嵌套,并介绍视图简化查询与索引优化性能的方法。最后提供实践建议与学习资源,助你提升SQL技能以应对实际数据处理需求。
167 1
|
4月前
|
SQL 运维 监控
SQL查询太慢?实战讲解YashanDB SQL调优思路
本文是Meetup第十期“调优实战专场”的第二篇技术文章,上一篇《高效查询秘诀,解码YashanDB优化器分组查询优化手段》中,我们揭秘了YashanDB分组查询优化秘诀,本文将通过一个案例,助你快速上手YashanDB慢日志功能,精准定位“慢SQL”后进行优化。
|
4月前
|
SQL 索引
【YashanDB知识库】字段加上索引后,SQL查询不到结果
【YashanDB知识库】字段加上索引后,SQL查询不到结果
|
2月前
|
SQL 关系型数据库 MySQL
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
|
4月前
|
SQL 人工智能 自然语言处理
OmniSQL:开源文本到SQL神器!自然语言秒转查询到复杂多表连接等SQL需求
OmniSQL是开源的文本到SQL转换模型,通过创新的数据合成框架生成250万条高质量样本,支持7B/14B/32B三种模型版本,能处理从简单查询到复杂多表连接等各种SQL需求。
384 16
OmniSQL:开源文本到SQL神器!自然语言秒转查询到复杂多表连接等SQL需求
|
4月前
|
SQL 大数据 数据挖掘
玩转大数据:从零开始掌握SQL查询基础
玩转大数据:从零开始掌握SQL查询基础
208 35
|
4月前
|
SQL 关系型数据库 MySQL
如何优化SQL查询以提高数据库性能?
这篇文章以生动的比喻介绍了优化SQL查询的重要性及方法。它首先将未优化的SQL查询比作在自助餐厅贪多嚼不烂的行为,强调了只获取必要数据的必要性。接着,文章详细讲解了四种优化策略:**精简选择**(避免使用`SELECT *`)、**专业筛选**(利用`WHERE`缩小范围)、**高效联接**(索引和限制数据量)以及**使用索引**(加速搜索)。此外,还探讨了如何避免N+1查询问题、使用分页限制结果、理解执行计划以及定期维护数据库健康。通过这些技巧,可以显著提升数据库性能,让查询更高效流畅。
|
5月前
|
SQL 关系型数据库 OLAP
云原生数据仓库AnalyticDB PostgreSQL同一个SQL可以实现向量索引、全文索引GIN、普通索引BTREE混合查询,简化业务实现逻辑、提升查询性能
本文档介绍了如何在AnalyticDB for PostgreSQL中创建表、向量索引及混合检索的实现步骤。主要内容包括:创建`articles`表并设置向量存储格式,创建ANN向量索引,为表增加`username`和`time`列,建立BTREE索引和GIN全文检索索引,并展示了查询结果。参考文档提供了详细的SQL语句和配置说明。
130 2
|
4月前
|
SQL 缓存 关系型数据库
SQL为什么不建议执行多表关联查询
本文探讨了SQL中不建议执行多表关联查询的原因,特别是MySQL与PG在多表关联上的区别。MySQL仅支持嵌套循环连接,而不支持排序-合并连接和散列连接,因此在多表(超过3张)关联查询时效率较低。文章还分析了多表关联查询与多次单表查询的效率对比,指出将关联操作放在Service层处理的优势,包括减少数据库计算资源消耗、提高缓存效率、降低锁竞争以及更易于分布式扩展等。最后,通过实例展示了如何分解关联查询以优化性能。
136 0
|
5月前
|
SQL 数据可视化 IDE
SQL做数据分析的困境,查询语言无法回答的真相
SQL 在简单数据分析任务中表现良好,但面对复杂需求时显得力不从心。例如,统计新用户第二天的留存率或连续活跃用户的计算,SQL 需要嵌套子查询和复杂关联,代码冗长难懂。Python 虽更灵活,但仍需变通思路,复杂度较高。相比之下,SPL(Structured Process Language)语法简洁、支持有序计算和分组子集保留,具备强大的交互性和调试功能,适合处理复杂的深度数据分析任务。SPL 已开源免费,是数据分析师的更好选择。