Flink(十四)【Flink SQL(中)查询】(1)https://developer.aliyun.com/article/1532319
4、特殊语法 - TopN
目前在Flink SQL中没有能够直接调用的TOP-N函数,而是提供了稍微复杂些的变通实现方法,是固定写法,特殊支持的over用法。
1)语法
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum -- 不能指定行范围!!! 只能从最早到当前 FROM table_name) WHERE rownum <= N [AND conditions]
- ROW_NUMBER() :标识 TopN 排序子句
- PARTITION BY col1[, col2...] :标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key ,就是根据需求中的搜索关键词(key)做为分区
- ORDER BY col1 [asc|desc][, col2 [asc|desc]...] :标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序,可以不是时间字段,也可以降序(TopN特殊支持)
- WHERE rownum <= N :这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个TopN 的查询,其中 N 代表 TopN 的条目数
- [AND conditions] :其他的限制条件也可以加上
2)案例
取每个传感器最高的3个水位值
select id, et, vc, rownum from( select id, vc, row_number() over (partition by id order by vc desc ) as rownum from ws )where rownum <= 3;
运行结果:
5、特殊语法 - Deduplication 去重
这种语法也是借助于 over 窗口,语法和 topN 相似,唯一不同的是它要求排序的字段必须是时间属性列,同样可以降序,不可以是其它非时间属性的列。去重的原理是按时间升序或降序然后取第一条,所以升序和降序很影响性能,我们一般建议升序,因为这样当来了一条相同的数据时,由于按照时间升序,新数据的时间小,不会取代已有的数据;但是如果是降序的话,新数据的时间比较晚会被放到第一条数据的位置,就要发生更新操作;所以我们一般使用升序。
在 row_number = 1 时,如果排序字段是普通列 planner 会翻译成 TopN 算子,如果是时间属性列 planner 会翻译成 Deduplication,这两者最终的执行算子是不一样的,Deduplication 相比 TopN 算子专门做了对应的优化,性能会有很大提升。可以从web UI 看出是翻译成哪种算子(TopN 的算子叫 Rank、Deduplication 的算子叫 Deduplication、普通聚合(比如sum、avg)的算子叫 OverAggregate)。
如果是按照时间属性字段降序,表示取最新一条,会造成不断的更新保存最新的一条。如果是升序,表示取最早的一条,不用去更新,性能更好。
1)语法
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1;
2)案例
对每个传感器的水位值去重
select id, et, vc, rownum from ( select id, et, vc, row_number() over(partition by id,vc order by et asc) as rownum from ws )where rownum = 1;
运行结果:
我们发现所有的结果的操作都是 +I 说明没有重复数据,都是新插入的数据。
6、联结查询
在标准SQL中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表的联结(Join)。在Flink SQL中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。
在流处理中,动态表的Join对应着两条数据流的Join操作。Flink SQL中的联结查询大体上也可以分为两类:SQL原生的联结查询方式,和流处理中特有的联结查询。
注意:什么 join 就以谁为主,left join 就是以左边的流为主,右边的流来了只能老老实实待着,待在状态里没有资格输出。right join 同理。
6.1、常规联结查询
常规联结(Regular Join)是SQL中原生定义的Join方式,是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同,通过关键字JOIN来联结两个表,后面用关键字ON来指明联结条件。
与标准SQL一致,Flink SQL的常规联结也可以分为内联结(INNER JOIN)和外联结(OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。
Regular Join 包含以下几种(以 L 作为左流中的数据标识, R 作为右流中的数据标识):
- Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
- Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] ),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null] ,然后输出 +[L, R]
- Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
- Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[null, R] ;对左流来说:Join 到输出 +[L, R] ,没 Join 到输出 +[L, null] )。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R] ,输出+[L, R] ,右流数据到达为例:回撤 -[L, null] ,输出 +[L, R]
Regular Join 的注意事项:
- 实时 Regular Join 可以不是 等值 join 。等值 join 和 非等值 join 区别在于, 等值 join数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游; 非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联。
- 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
- join 的时候,左右两条流中的数据都会先存到状态中去,先来的数据会去对方的状态中去查找有没有可以匹配上的,没有就匹配为 null。
我们再准备一张表用于join:
CREATE TABLE ws1 ( id INT, vc INT, pt AS PROCTIME(), --处理时间 et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间 WATERMARK FOR et AS et - INTERVAL '0.001' SECOND --watermark ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.id.min' = '3', 'fields.id.max' = '5', 'fields.vc.min' = '1', 'fields.vc.max' = '100' );
1)等值内联结(INNER Equi-JOIN)
内联结用INNER JOIN来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。
select ws.id,ws1.id from ws join ws1 on ws.id=ws1.id;
我们可以看到,inner join 必须等待两条流都到齐后才会开始 join,否则就在状态里等着。
2)等值外联结(OUTER Equi-JOIN)
与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL支持左外(LEFT JOIN)、右外(RIGHT JOIN)和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。
具体用法如下:
left join:
select ws.id,ws1.id,ws.vc,ws1.vc from ws left join ws1 on ws.vc=ws1.vc;
运行结果:
right join:
select ws.id,ws.vc,ws1.id,ws1.vc from ws right join ws1 on ws.vc=ws1.vc;
运行结果:
full join:
6.2、间隔联结查询
我们曾经学习过 DataStream API 中的双流 Join ,包括窗口联结(window join)和间隔联结(interval join)。两条流的Join就对应着SQL中两个表的Join,这是流处理中特有的联结方式。目前Flink SQL还不支持窗口联结,而间隔联结则已经实现。
间隔联结(Interval Join)返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:
- 两表的联结
间隔联结不需要用JOIN关键字,直接在FROM后将要联结的两表列出来就可以,用逗号分隔。这与标准SQL中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。
- 联结条件
联结条件用WHERE子句来定义,用一个等值表达式描述。交叉联结之后再用WHERE进行条件筛选,效果跟内联结INNER JOIN ... ON ...非常类似。
- 时间间隔限制
我们可以在WHERE子句中,联结条件后用AND追加一个时间间隔的限制条件;做法是提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。具体定义方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:
(1)ltime = rtime
(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
测试一下:
select * from ws,ws1 where ws.id=ws1.id and ws.et between ws1.et - interval '2' second and ws1.et + interval '2' second;
查看 Web UI :
6.3、维表联结查询
Lookup Join 其实就是维表 Join,实时获取外部缓存的 Join,Lookup 的意思就是实时查找。
维度表(Dimension Table)是数据仓库中的一种表结构,通常包含有关业务过程的描述性属性。它可以提供多个角度来理解和分析数据。维度表的特点是宽而扁平,每一行代表一个实体,每一列代表一个属性。
比如我们只知道商品的 id ,不知道商品的名字,那么我们的Flink流就需要去 join 通过mysql 的连接器去获取mysql中的数据。
上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种外部存储介质的 Join。仅支持处理时间字段。
表A -- 固定写法 没有为什么 JOIN 维度表名 FOR SYSTEM_TIME AS OF 表A.proc_time AS 别名 ON xx.字段=别名.字段
比如维表在mysql,维表join的写法如下:
CREATE TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://hadoop102:3306/customerdb', 'table-name' = 'customers' ); -- order表每来一条数据,都会去mysql的customers表查找维度数据 SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; --连接条件
后面我们学习连接器的时候会深入学习。
Flink(十四)【Flink SQL(中)查询】(3)https://developer.aliyun.com/article/1532324