Flink(十四)【Flink SQL(中)查询】(3)

简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320

7、Order by 和 limit

1order by

支持 Batch\Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。

-- 时间属性必须为升序asc 
select * from ws order by et,id desc;    - 我们指定id字段降序

运行结果:

2)limit

select * form ws limit 3;

运行结果:

8、SQL Hints

       翻译过来叫 SQL 暗示,确实语法就像我们 Java 中的注释一样。

       在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。放我们定义好一张 Flink SQL 表的时候,如果要修改参数,比如数据生成器产生数据的速度,我们不可能每次都把表删了重建,那样太麻烦了。我们可以使用 SQL Hints,它主要修改的就是我们建表时 with的参数。

-- 修改数据生成器产生数据的速度
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;

9、集合操作

1)UNION 和 UNION ALL(合并)

       将两张表上下拼接在一起,要求必须字段必须一样,至少字段数量一样,而且每个位置的字段类型是一样的。

  • UNION:将集合合并并且去重
  • UNION ALL:将集合合并,不做去重。
select id from ws
union
select id from ws1;

运行结果:

可以看到结果是去重的,因为我们 id 的范围(ws: 1-3,ws1:3-5)。

select id from ws
union all
select id from ws1;

运行结果:

可以看到,union all 不去重。

2)Intersect Intersect All

  • Intersect:交集并且去重
  • Intersect ALL:交集不做去重
select id from ws
intersect
select id from ws1;

运行结果:

可以看到,运行结果就是我们两张表的共同 id 。

select id from ws
intersect all
select id from ws1;

运行结果:

可以看到,结果没有去重。

3)Except Except All

  • Except:差集并且去重
  • Except ALL:差集不做去重
select id from ws
except
select id from ws1;

运行结果:

可以看到,左边ws流中 id=3 的数据到了,但是右边ws1流中没有,于是认为是差值输出,直到ws1中出现了id=3的数据,发现这不是差值,于是撤回。

而且结果是相对于左边 ws 流的,也就是只会输出 ws 流中有而 ws1 没有的数据。

select id from ws
except all
select id from ws1;

运行结果:

       上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。

4)In 子查询

  • In 子查询的结果集只能有一列
select id,vc from ws where id in (select id from ws1);

运行结果:

上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。

10、系统函数(System Functions)

       系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。

       Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。

建议直接去官网查看

我们也可以通过下面的指令查看所有内置函数:

show functions;

10.1、标量函数(Scalar Functions)

标量函数指的就是只对输入数据做转换操作、返回一个值的函数。

标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。

1)比较函数(Comparison Functions)

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:

(1)value1 = value2  判断两个值相等;

(2)value1 <> value2  判断两个值不相等

(3)value IS NOT NULL 判断value不为空

2)逻辑函数(Logical Functions)

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:

(1)boolean1 OR boolean2  布尔值boolean1与布尔值boolean2取逻辑或

(2)boolean IS FALSE  判断布尔值boolean是否为false

(3)NOT boolean  布尔值boolean取逻辑非

3)算术函数(Arithmetic Functions)

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:

(1)numeric1 + numeric2  两数相加

(2)POWER(numeric1, numeric2)  幂运算,取数numeric1的numeric2次方

(3)RAND()  返回(0.0, 1.0)区间内的一个double类型的伪随机数

4)字符串函数(String Functions)

进行字符串处理的函数。例如:

(1)string1 || string2  两个字符串的连接

(2)UPPER(string)  将字符串string转为全部大写

(3)CHAR_LENGTH(string)  计算字符串string的长度

5)时间函数(Temporal Functions)

进行与时间相关操作的函数。例如:

(1)DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

(2)TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

(3)CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

(4)INTERVAL string range  返回一个时间间隔。

2)聚合函数(Aggregate Functions)

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:

(1)COUNT(*)  返回所有行的数量,统计个数。

(2)SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

(3)RANK()   返回当前值在一组值中的排名。

(4)ROW_NUMBER()    对一组值排序后,返回当前值的行号。

其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。

11、Module 操作

       Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。

目前 Flink 包含了以下三种 Module:

  • CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  • 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

       使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

我们可以查看当前启用的 module:

或者:

1)语法

-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
 
-- 查看
SHOW MODULES;
SHOW FULL MODULES;

       在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。

       此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:

use module hive,core;

       USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。

2)案例

加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。

1. 上传jar包到flink的lib中

上传hive connector

这里的 hive 版本要和我们导进去的 jar 包的hive版本一致,

# linux
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/

2. 重启 flink 集群和 sql-client

Flink SQL> load module hive with('hive-version'='3.1.3');
Flink SQL> show full modules;
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
|        hive | TRUE |
+-------------+------+
2 rows in set

我们查看所有函数:

Flink SQL> show functions;

我们发现,内置函数多了两百多个。

select split('hello,flink');

运行结果:

如果 hive 和 flink 有一个相同函数,一般会优先使用 flink 中的,但我们可以调整优先级。

use modules hive,core;


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1093 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
506 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
7月前
|
SQL 监控 关系型数据库
一键开启百倍加速!RDS DuckDB 黑科技让SQL查询速度最高提升200倍
RDS MySQL DuckDB分析实例结合事务处理与实时分析能力,显著提升SQL查询性能,最高可达200倍,兼容MySQL语法,无需额外学习成本。
|
7月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
7月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
8月前
|
SQL XML Java
通过MyBatis的XML配置实现灵活的动态SQL查询
总结而言,通过MyBatis的XML配置实现灵活的动态SQL查询,可以让开发者以声明式的方式构建SQL语句,既保证了SQL操作的灵活性,又简化了代码的复杂度。这种方式可以显著提高数据库操作的效率和代码的可维护性。
490 18
|
6月前
|
SQL 关系型数据库 MySQL
(SQL)SQL语言中的查询语句整理
查询语句在sql中占了挺大一部分篇幅,因为在数据库中使用查询语句的次数远多于更新与删除命令。而查询语句比起其他语句要更加的复杂,可因为sql是数据库不可或缺的一部分,所以即使不懂,也必须得弄懂,以上。
366 0
|
9月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1201 1
|
8月前
|
SQL 人工智能 数据库
【三桥君】如何正确使用SQL查询语句:避免常见错误?
三桥君解析了SQL查询中的常见错误和正确用法。AI产品专家三桥君通过三个典型案例:1)属性重复比较错误,应使用IN而非AND;2)WHERE子句中非法使用聚合函数的错误,应改用HAVING;3)正确的分组查询示例。三桥君还介绍了学生、课程和选课三个关系模式,并分析了SQL查询中的属性比较、聚合函数使用和分组查询等关键概念。最后通过实战练习帮助读者巩固知识,强调掌握这些技巧对提升数据库查询效率的重要性。
275 0
|
9月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据