Flink(十四)【Flink SQL(中)查询】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320

7、Order by 和 limit

1order by

支持 Batch\Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。

-- 时间属性必须为升序asc 
select * from ws order by et,id desc;    - 我们指定id字段降序

运行结果:

2)limit

select * form ws limit 3;

运行结果:

8、SQL Hints

       翻译过来叫 SQL 暗示,确实语法就像我们 Java 中的注释一样。

       在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。放我们定义好一张 Flink SQL 表的时候,如果要修改参数,比如数据生成器产生数据的速度,我们不可能每次都把表删了重建,那样太麻烦了。我们可以使用 SQL Hints,它主要修改的就是我们建表时 with的参数。

-- 修改数据生成器产生数据的速度
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;

9、集合操作

1)UNION 和 UNION ALL(合并)

       将两张表上下拼接在一起,要求必须字段必须一样,至少字段数量一样,而且每个位置的字段类型是一样的。

  • UNION:将集合合并并且去重
  • UNION ALL:将集合合并,不做去重。
select id from ws
union
select id from ws1;

运行结果:

可以看到结果是去重的,因为我们 id 的范围(ws: 1-3,ws1:3-5)。

select id from ws
union all
select id from ws1;

运行结果:

可以看到,union all 不去重。

2)Intersect Intersect All

  • Intersect:交集并且去重
  • Intersect ALL:交集不做去重
select id from ws
intersect
select id from ws1;

运行结果:

可以看到,运行结果就是我们两张表的共同 id 。

select id from ws
intersect all
select id from ws1;

运行结果:

可以看到,结果没有去重。

3)Except Except All

  • Except:差集并且去重
  • Except ALL:差集不做去重
select id from ws
except
select id from ws1;

运行结果:

可以看到,左边ws流中 id=3 的数据到了,但是右边ws1流中没有,于是认为是差值输出,直到ws1中出现了id=3的数据,发现这不是差值,于是撤回。

而且结果是相对于左边 ws 流的,也就是只会输出 ws 流中有而 ws1 没有的数据。

select id from ws
except all
select id from ws1;

运行结果:

       上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。

4)In 子查询

  • In 子查询的结果集只能有一列
select id,vc from ws where id in (select id from ws1);

运行结果:

上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。

10、系统函数(System Functions)

       系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。

       Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。

建议直接去官网查看

我们也可以通过下面的指令查看所有内置函数:

show functions;

10.1、标量函数(Scalar Functions)

标量函数指的就是只对输入数据做转换操作、返回一个值的函数。

标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。

1)比较函数(Comparison Functions)

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:

(1)value1 = value2  判断两个值相等;

(2)value1 <> value2  判断两个值不相等

(3)value IS NOT NULL 判断value不为空

2)逻辑函数(Logical Functions)

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:

(1)boolean1 OR boolean2  布尔值boolean1与布尔值boolean2取逻辑或

(2)boolean IS FALSE  判断布尔值boolean是否为false

(3)NOT boolean  布尔值boolean取逻辑非

3)算术函数(Arithmetic Functions)

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:

(1)numeric1 + numeric2  两数相加

(2)POWER(numeric1, numeric2)  幂运算,取数numeric1的numeric2次方

(3)RAND()  返回(0.0, 1.0)区间内的一个double类型的伪随机数

4)字符串函数(String Functions)

进行字符串处理的函数。例如:

(1)string1 || string2  两个字符串的连接

(2)UPPER(string)  将字符串string转为全部大写

(3)CHAR_LENGTH(string)  计算字符串string的长度

5)时间函数(Temporal Functions)

进行与时间相关操作的函数。例如:

(1)DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

(2)TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

(3)CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

(4)INTERVAL string range  返回一个时间间隔。

2)聚合函数(Aggregate Functions)

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:

(1)COUNT(*)  返回所有行的数量,统计个数。

(2)SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

(3)RANK()   返回当前值在一组值中的排名。

(4)ROW_NUMBER()    对一组值排序后,返回当前值的行号。

其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。

11、Module 操作

       Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。

目前 Flink 包含了以下三种 Module:

  • CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  • 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

       使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

我们可以查看当前启用的 module:

或者:

1)语法

-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
 
-- 查看
SHOW MODULES;
SHOW FULL MODULES;

       在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。

       此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:

use module hive,core;

       USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。

2)案例

加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。

1. 上传jar包到flink的lib中

上传hive connector

这里的 hive 版本要和我们导进去的 jar 包的hive版本一致,

# linux
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/

2. 重启 flink 集群和 sql-client

Flink SQL> load module hive with('hive-version'='3.1.3');
Flink SQL> show full modules;
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
|        hive | TRUE |
+-------------+------+
2 rows in set

我们查看所有函数:

Flink SQL> show functions;

我们发现,内置函数多了两百多个。

select split('hello,flink');

运行结果:

如果 hive 和 flink 有一个相同函数,一般会优先使用 flink 中的,但我们可以调整优先级。

use modules hive,core;


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
91 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
8天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
28天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
36 8
|
1月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
59 4
|
1月前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
1月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
143 10
|
1月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
2月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
2月前
|
SQL Java 数据库连接
如何使用`DriverManager.getConnection()`连接数据库,并利用`PreparedStatement`执行参数化查询,有效防止SQL注入。
【10月更文挑战第6天】在代码与逻辑交织的世界中,我从一名数据库新手出发,通过不断探索与实践,最终成为熟练掌握JDBC的开发者。这段旅程充满挑战与惊喜,从建立数据库连接到执行SQL语句,再到理解事务管理和批处理等高级功能,每一步都让我对JDBC有了更深的认识。示例代码展示了如何使用`DriverManager.getConnection()`连接数据库,并利用`PreparedStatement`执行参数化查询,有效防止SQL注入。
130 5