Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320
7、Order by 和 limit
1)order by
支持 Batch\Streaming,但在实时任务中一般用的非常少。
实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。
-- 时间属性必须为升序asc select * from ws order by et,id desc; - 我们指定id字段降序
运行结果:
2)limit
select * form ws limit 3;
运行结果:
8、SQL Hints
翻译过来叫 SQL 暗示,确实语法就像我们 Java 中的注释一样。
在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。放我们定义好一张 Flink SQL 表的时候,如果要修改参数,比如数据生成器产生数据的速度,我们不可能每次都把表删了重建,那样太麻烦了。我们可以使用 SQL Hints,它主要修改的就是我们建表时 with的参数。
-- 修改数据生成器产生数据的速度 select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;
9、集合操作
1)UNION 和 UNION ALL(合并)
将两张表上下拼接在一起,要求必须字段必须一样,至少字段数量一样,而且每个位置的字段类型是一样的。
- UNION:将集合合并并且去重
- UNION ALL:将集合合并,不做去重。
select id from ws union select id from ws1;
运行结果:
可以看到结果是去重的,因为我们 id 的范围(ws: 1-3,ws1:3-5)。
select id from ws union all select id from ws1;
运行结果:
可以看到,union all 不去重。
2)Intersect 和 Intersect All
- Intersect:交集并且去重
- Intersect ALL:交集不做去重
select id from ws intersect select id from ws1;
运行结果:
可以看到,运行结果就是我们两张表的共同 id 。
select id from ws intersect all select id from ws1;
运行结果:
可以看到,结果没有去重。
3)Except 和 Except All
- Except:差集并且去重
- Except ALL:差集不做去重
select id from ws except select id from ws1;
运行结果:
可以看到,左边ws流中 id=3 的数据到了,但是右边ws1流中没有,于是认为是差值输出,直到ws1中出现了id=3的数据,发现这不是差值,于是撤回。
而且结果是相对于左边 ws 流的,也就是只会输出 ws 流中有而 ws1 没有的数据。
select id from ws except all select id from ws1;
运行结果:
上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。
4)In 子查询
- In 子查询的结果集只能有一列
select id,vc from ws where id in (select id from ws1);
运行结果:
上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。
10、系统函数(System Functions)
系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。
Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。
我们也可以通过下面的指令查看所有内置函数:
show functions;
10.1、标量函数(Scalar Functions)
标量函数指的就是只对输入数据做转换操作、返回一个值的函数。
标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。
1)比较函数(Comparison Functions)
比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:
(1)value1 = value2 判断两个值相等;
(2)value1 <> value2 判断两个值不相等
(3)value IS NOT NULL 判断value不为空
2)逻辑函数(Logical Functions)
逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:
(1)boolean1 OR boolean2 布尔值boolean1与布尔值boolean2取逻辑或
(2)boolean IS FALSE 判断布尔值boolean是否为false
(3)NOT boolean 布尔值boolean取逻辑非
3)算术函数(Arithmetic Functions)
进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:
(1)numeric1 + numeric2 两数相加
(2)POWER(numeric1, numeric2) 幂运算,取数numeric1的numeric2次方
(3)RAND() 返回(0.0, 1.0)区间内的一个double类型的伪随机数
4)字符串函数(String Functions)
进行字符串处理的函数。例如:
(1)string1 || string2 两个字符串的连接
(2)UPPER(string) 将字符串string转为全部大写
(3)CHAR_LENGTH(string) 计算字符串string的长度
5)时间函数(Temporal Functions)
进行与时间相关操作的函数。例如:
(1)DATE string 按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date
(2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp
(3)CURRENT_TIME 返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)
(4)INTERVAL string range 返回一个时间间隔。
2)聚合函数(Aggregate Functions)
聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。
标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:
(1)COUNT(*) 返回所有行的数量,统计个数。
(2)SUM([ ALL | DISTINCT ] expression) 对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。
(3)RANK() 返回当前值在一组值中的排名。
(4)ROW_NUMBER() 对一组值排序后,返回当前值的行号。
其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。
11、Module 操作
Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。
目前 Flink 包含了以下三种 Module:
- CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
- HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
- 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module
使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。
我们可以查看当前启用的 module:
或者:
1)语法
-- 加载 LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)] -- 卸载 UNLOAD MODULE module_name -- 查看 SHOW MODULES; SHOW FULL MODULES;
在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。
此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:
use module hive,core;
USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。
2)案例
加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。
1. 上传jar包到flink的lib中
上传hive connector
这里的 hive 版本要和我们导进去的 jar 包的hive版本一致,
# linux cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/
2. 重启 flink 集群和 sql-client
Flink SQL> load module hive with('hive-version'='3.1.3'); Flink SQL> show full modules; +-------------+------+ | module name | used | +-------------+------+ | core | TRUE | | hive | TRUE | +-------------+------+ 2 rows in set
我们查看所有函数:
Flink SQL> show functions;
我们发现,内置函数多了两百多个。
select split('hello,flink');
运行结果:
如果 hive 和 flink 有一个相同函数,一般会优先使用 flink 中的,但我们可以调整优先级。
use modules hive,core;