Flink(十四)【Flink SQL(中)查询】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十四)【Flink SQL(中)查询】

Flink(十四)【Flink SQL(中)查询】(2)https://developer.aliyun.com/article/1532320

7、Order by 和 limit

1order by

支持 Batch\Streaming,但在实时任务中一般用的非常少。

实时任务中,Order By 子句中必须要有时间属性字段,并且必须写在最前面且为升序。

-- 时间属性必须为升序asc 
select * from ws order by et,id desc;    - 我们指定id字段降序

运行结果:

2)limit

select * form ws limit 3;

运行结果:

8、SQL Hints

       翻译过来叫 SQL 暗示,确实语法就像我们 Java 中的注释一样。

       在执行查询时,可以在表名后面添加SQL Hints来临时修改表属性,对当前job生效。放我们定义好一张 Flink SQL 表的时候,如果要修改参数,比如数据生成器产生数据的速度,我们不可能每次都把表删了重建,那样太麻烦了。我们可以使用 SQL Hints,它主要修改的就是我们建表时 with的参数。

-- 修改数据生成器产生数据的速度
select * from ws1/*+ OPTIONS('rows-per-second'='10')*/;

9、集合操作

1)UNION 和 UNION ALL(合并)

       将两张表上下拼接在一起,要求必须字段必须一样,至少字段数量一样,而且每个位置的字段类型是一样的。

  • UNION:将集合合并并且去重
  • UNION ALL:将集合合并,不做去重。
select id from ws
union
select id from ws1;

运行结果:

可以看到结果是去重的,因为我们 id 的范围(ws: 1-3,ws1:3-5)。

select id from ws
union all
select id from ws1;

运行结果:

可以看到,union all 不去重。

2)Intersect Intersect All

  • Intersect:交集并且去重
  • Intersect ALL:交集不做去重
select id from ws
intersect
select id from ws1;

运行结果:

可以看到,运行结果就是我们两张表的共同 id 。

select id from ws
intersect all
select id from ws1;

运行结果:

可以看到,结果没有去重。

3)Except Except All

  • Except:差集并且去重
  • Except ALL:差集不做去重
select id from ws
except
select id from ws1;

运行结果:

可以看到,左边ws流中 id=3 的数据到了,但是右边ws1流中没有,于是认为是差值输出,直到ws1中出现了id=3的数据,发现这不是差值,于是撤回。

而且结果是相对于左边 ws 流的,也就是只会输出 ws 流中有而 ws1 没有的数据。

select id from ws
except all
select id from ws1;

运行结果:

       上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。

4)In 子查询

  • In 子查询的结果集只能有一列
select id,vc from ws where id in (select id from ws1);

运行结果:

上述 SQL 的 In 子句和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,要注意设置 State 的 TTL。

10、系统函数(System Functions)

       系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。

       Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。

建议直接去官网查看

我们也可以通过下面的指令查看所有内置函数:

show functions;

10.1、标量函数(Scalar Functions)

标量函数指的就是只对输入数据做转换操作、返回一个值的函数。

标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。

1)比较函数(Comparison Functions)

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:

(1)value1 = value2  判断两个值相等;

(2)value1 <> value2  判断两个值不相等

(3)value IS NOT NULL 判断value不为空

2)逻辑函数(Logical Functions)

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:

(1)boolean1 OR boolean2  布尔值boolean1与布尔值boolean2取逻辑或

(2)boolean IS FALSE  判断布尔值boolean是否为false

(3)NOT boolean  布尔值boolean取逻辑非

3)算术函数(Arithmetic Functions)

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:

(1)numeric1 + numeric2  两数相加

(2)POWER(numeric1, numeric2)  幂运算,取数numeric1的numeric2次方

(3)RAND()  返回(0.0, 1.0)区间内的一个double类型的伪随机数

4)字符串函数(String Functions)

进行字符串处理的函数。例如:

(1)string1 || string2  两个字符串的连接

(2)UPPER(string)  将字符串string转为全部大写

(3)CHAR_LENGTH(string)  计算字符串string的长度

5)时间函数(Temporal Functions)

进行与时间相关操作的函数。例如:

(1)DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date

(2)TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp

(3)CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价)

(4)INTERVAL string range  返回一个时间间隔。

2)聚合函数(Aggregate Functions)

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:

(1)COUNT(*)  返回所有行的数量,统计个数。

(2)SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。

(3)RANK()   返回当前值在一组值中的排名。

(4)ROW_NUMBER()    对一组值排序后,返回当前值的行号。

其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。

11、Module 操作

       Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。

目前 Flink 包含了以下三种 Module:

  • CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  • HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQL\Table API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  • 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

       使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

我们可以查看当前启用的 module:

或者:

1)语法

-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
 
-- 查看
SHOW MODULES;
SHOW FULL MODULES;

       在 Flink 中,Module 可以被 加载、启用 、禁用 、卸载 Module,当加载Module 之后,默认就是开启的。同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。

       此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数且都启用时, Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF,可以使用下面语法更改顺序:

use module hive,core;

       USE是启用module,没有被use的为禁用(禁用不是卸载),除此之外还可以实现调整顺序的效果。上面的语句会将 Hive Module 设为第一个使用及解析的 Module。

2)案例

加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。需要先引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。

1. 上传jar包到flink的lib中

上传hive connector

这里的 hive 版本要和我们导进去的 jar 包的hive版本一致,

# linux
cp flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar /opt/module/flink-1.17.0/lib/

2. 重启 flink 集群和 sql-client

Flink SQL> load module hive with('hive-version'='3.1.3');
Flink SQL> show full modules;
+-------------+------+
| module name | used |
+-------------+------+
|        core | TRUE |
|        hive | TRUE |
+-------------+------+
2 rows in set

我们查看所有函数:

Flink SQL> show functions;

我们发现,内置函数多了两百多个。

select split('hello,flink');

运行结果:

如果 hive 和 flink 有一个相同函数,一般会优先使用 flink 中的,但我们可以调整优先级。

use modules hive,core;


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
SQL 存储 程序员
SQL查询的一些基本知识和学习指导
【6月更文挑战第17天】SQL查询核心包括基础选择、连接(JOIN)、子查询、聚合函数与GROUP BY、模糊匹配(LIKE)、分页与排序。JOIN操作连接多表,GROUP BY配合聚合函数做统计,LIKE用于模糊搜索。理解存储过程、触发器及自动增长列等进阶概念,通过实践提升SQL技能。
33 2
|
1天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用问题之如何调整改变SQL查询的严格性
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之如果oss文件过大,如何在不调整oss源文件大小的情况下优化查询sql
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
13天前
|
SQL 存储 安全
SQL入门与进阶:数据库查询与管理的实用指南
一、引言 在数字化时代,数据库已经成为各行各业存储、管理和分析数据的关键基础设施
|
1天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之如何通过临时查询功能来书写和运行SQL语句
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
SQL 前端开发 关系型数据库
零基础学习数据库SQL语句之查询表中数据的DQL语句
零基础学习数据库SQL语句之查询表中数据的DQL语句
5 0
|
7天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
11天前
|
SQL
SQL查询
SQL查询
16 0
|
14天前
|
SQL 安全 数据库
SQL实践指南:从基础到进阶的数据库查询与管理
一、引言 在数据驱动的时代,数据库已成为各行各业不可或缺的一部分