Flink(十四)【Flink SQL(中)查询】(2)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(1)https://developer.aliyun.com/article/1532319

4、特殊语法 - TopN

       目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法,是固定写法,特殊支持的over用法。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-- 不能指定行范围!!! 只能从最早到当前
FROM table_name)
WHERE rownum <= N [AND conditions]
  • ROW_NUMBER() :标识 TopN 排序子句
  • PARTITION BY col1[, col2...] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key ,就是根据需求中的搜索关键词(key)做为分区
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序,可以不是时间字段,也可以降序(TopN特殊支持)
  • WHERE rownum <= N :这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个TopN 的查询,其中 N 代表 TopN 的条目数
  • [AND conditions] :其他的限制条件也可以加上

2)案例

取每个传感器最高的3个水位值

select 
id,
et,
vc,
rownum
from(
    select id,
    vc,
    row_number() over (partition by id order by vc desc ) as rownum
    from ws
)where rownum <= 3;

运行结果:

5、特殊语法 - Deduplication 去重

       这种语法也是借助于 over 窗口,语法和 topN 相似,唯一不同的是它要求排序的字段必须是时间属性列,同样可以降序,不可以是其它非时间属性的列。去重的原理是按时间升序或降序然后取第一条,所以升序和降序很影响性能,我们一般建议升序,因为这样当来了一条相同的数据时,由于按照时间升序,新数据的时间小,不会取代已有的数据;但是如果是降序的话,新数据的时间比较晚会被放到第一条数据的位置,就要发生更新操作;所以我们一般使用升序。

       在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。可以从web UI 看出是翻译成哪种算子(TopN 的算子叫 Rank、Deduplication 的算子叫 Deduplication、普通聚合(比如sum、avg)的算子叫 OverAggregate)。

如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。

1)语法

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1;

2)案例

对每个传感器的水位值去重

select
id,
et,
vc,
rownum
from (
    select 
    id,
    et,
    vc,
    row_number() over(partition by id,vc order by et asc) as rownum
    from ws 
)where rownum = 1;

运行结果:

我们发现所有的结果的操作都是 +I  说明没有重复数据,都是新插入的数据。

6、联结查询

       在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。

       在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。

注意:什么 join 就以谁为主,left join 就是以左边的流为主,右边的流来了只能老老实实待着,待在状态里没有资格输出。right join 同理。

6.1、常规联结查询

       常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。

       与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。

Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):

  • Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
  • Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
  • Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]

Regular Join 的注意事项:

  • 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
  • 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大
  • join 的时候,左右两条流中的数据都会先存到状态中去,先来的数据会去对方的状态中去查找有没有可以匹配上的,没有就匹配为 null。

我们再准备一张表用于join:

CREATE TABLE ws1 (
  id INT,
  vc INT,
  pt AS PROCTIME(), --处理时间
  et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
  WATERMARK FOR et AS et - INTERVAL '0.001' SECOND   --watermark
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.id.min' = '3',
  'fields.id.max' = '5',
  'fields.vc.min' = '1',
  'fields.vc.max' = '100'
);
1)等值内联结(INNER Equi-JOIN

内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。

select ws.id,ws1.id from ws join ws1 on ws.id=ws1.id;

我们可以看到,inner join 必须等待两条流都到齐后才会开始 join,否则就在状态里等着。

2)等值外联结(OUTER Equi-JOIN

与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。

具体用法如下:

left join

select ws.id,ws1.id,ws.vc,ws1.vc from ws left join ws1 on ws.vc=ws1.vc;

运行结果:

right join

select ws.id,ws.vc,ws1.id,ws1.vc from ws right join ws1 on ws.vc=ws1.vc;

运行结果:

full join

6.2、间隔联结查询

       我们曾经学习过 DataStream API 中的双流 Join ,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。

       间隔联结(Interval Join)返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:

  • 两表的联结

       间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

  • 联结条件

       联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。

  • 时间间隔限制

       我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

测试一下:

select * from ws,ws1 where ws.id=ws1.id and ws.et between ws1.et - interval '2' second and ws1.et + interval '2' second;

查看 Web UI :

6.3、维表联结查询

       Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。

维度表(Dimension Table)是数据仓库中的一种表结构,通常包含有关业务过程的描述性属性。它可以提供多个角度来理解和分析数据。维度表的特点是宽而扁平,每一行代表一个实体,每一列代表一个属性。

比如我们只知道商品的 id ,不知道商品的名字,那么我们的Flink流就需要去 join 通过mysql 的连接器去获取mysql中的数据。

       上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段

表A
-- 固定写法 没有为什么
JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名
ON xx.字段=别名.字段

比如维表在mysql,维表join的写法如下:

CREATE TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop102:3306/customerdb',
  'table-name' = 'customers'
);
 
-- order表每来一条数据,都会去mysql的customers表查找维度数据
 
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;    --连接条件

后面我们学习连接器的时候会深入学习。

Flink(十四)【Flink SQL(中)查询】(3)https://developer.aliyun.com/article/1532324

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9天前
|
SQL 存储 程序员
SQL查询的一些基本知识和学习指导
【6月更文挑战第17天】SQL查询核心包括基础选择、连接(JOIN)、子查询、聚合函数与GROUP BY、模糊匹配(LIKE)、分页与排序。JOIN操作连接多表,GROUP BY配合聚合函数做统计,LIKE用于模糊搜索。理解存储过程、触发器及自动增长列等进阶概念,通过实践提升SQL技能。
35 2
|
2天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用问题之如何调整改变SQL查询的严格性
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之如果oss文件过大,如何在不调整oss源文件大小的情况下优化查询sql
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之如何通过临时查询功能来书写和运行SQL语句
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
SQL 前端开发 关系型数据库
零基础学习数据库SQL语句之查询表中数据的DQL语句
零基础学习数据库SQL语句之查询表中数据的DQL语句
6 0
|
9天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
13天前
|
SQL
SQL查询
SQL查询
16 0
|
9天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
582 0
|
9天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
509 0