Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

前言

       最近在假期实训,但是实在水的不行,三天要学完SSM,实在一言难尽,浪费那时间干什么呢。SSM 之前学了一半,等后面忙完了,再去好好重学一遍,毕竟这玩意真是面试必会的东西。

       今天开始学习 Flink 最后一部分 Flink SQL ,完了还有不少框架得学:Kafka、Flume、ClickHouse、Hudi、Azkaban、OOzie ... 有的算是小工具,不费劲,但是学完得复习啊,这么多东西,必须赶紧做个小项目练练手。

Flink SQL

       到现在我们学完了底层API(也就是 process)和核心 API(这里由于Flink现在已经流批一体,所以我们只学习 DataStream就好了),然后就是剩下的 Table API(类似于 Spark 中的 DataFrame 和 DataSet)和SQL(类似于Spark SQL)。显然最上层的都是高级 API ,它们的底层还是我们学的这些 DataStream 和 process 算子。不过毕竟是高级 API ,它对 SQL 语句都进行了优化,一般能用 SQL 肯定没人愿意用繁琐的代码去实现,大大降低了开发 Flink 程序的难度,但是一些 SQL 实现不了的东西当然还是得底层核心 API 来实现,就像 Spark 中的 RDD 编程一样。

1、sql-client 准备

       为了方便练习演示 Flink SQL 语法,我们需要使用Flink 提供的 sql-client 进行操作。(类似于我们 Hive 中给的 hive 命令进入一个命令行模式就可以进行一些 hive sql 的操作)

1.1、基于 yarn-session 模式

1)启动 Flink
# 先启动 hadoop
myhadoop start
# 不需要启动 flink
/opt/module/flink-1.17.0/bin/yarn-session.sh -d

完了直接从 Yarn 的 Web UI 跳转到 Flink 的 Web UI

2)启动 sql-client
./sql-client.sh embedded -s yarn-session

1.2、常用配置

1)结果显示模式
#默认 table,还可以设置为 tableau、changelog
SET sql-client.execution.result-mode=changelog
2)执行环境
SET execution.runtime-mode=streaming; #默认是 streaming,也可以设置为为 batch
3)默认并行度
# 默认使用的是 flink 配置文件中的默认值
SET parallelism.default=1
4)设置状态TTL
SET table.exec.state.ttl=1000    #单位 ms
5)通过 sql 文件初始化

我们在

1. 创建 sql 文件

vim sql-client-init.sql
create database mydatabases;

2. 启动时,指定sql文件

./sql-client.sh embedded -s yarn-session -i sql-client-init.sql

2、流处理中的表

MySQL Flink
处理的数据对象 字段元祖的有界集合 字段元祖的无限序列
查询对数据的访问 可以访问到完整的数据输入 无法访问所有数据,必须持续等待流式输入
查询终止条件 生成固定大小的结果集后终止 永不停止,根据持续收到的数据不断更新查询结果(停不下来)

可以看到,关系型数据库 SQL 和我们 Flink SQL 的区别还是很大的。

2.1、动态表和持续查询

       流处理面对的数据是连续不断的,未知的,这就导致流处理中的“表”和我们熟悉的传统关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。

1)动态表(Dynamic Tables)

       简答来说,就是来一条数据,插入一行数据,我们的表随着数据的增加也不断扩大,所以叫动态表。

       当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

       动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。

2)持续查询(Continuous Query)

       动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

       也就是说,对于 MySQL ,查询出来的数据是某一刻的数据,但是 Flink 的查询是停不下来的,它的查询结果是一直动态变化的,所以叫持续查询。

       由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。

持续查询的步骤如下:

(1)流(stream)被转换为动态表(dynamic table);

(2)对动态表进行持续查询(continuous query),生成新的动态表;

(3)生成的动态表被转换成流。

这样,只要API将流和动态表的转换封装起来,我们就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。

2.2、将流转换成动态表

       如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。

       例如,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。

2.3、用 SQL 持续查询

1)更新(Update)查询

我们在代码中定义了一个SQL查询。

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

       当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用 toChangelogStream() 方法。

       我们新来数据按说是应该追加到后面的,但是这里的第三条数据 "Alice ./prod?id=1 5000"并没有追加到表的后面,而是把我们表的第一行也为 "Alice" 的那一行进行了修改,这就是更新流查询,它是通过回撤流实现的。

2)追加(Append)查询

       上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。

Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");

       这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。

       由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作。所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。

2.4、将动态表转换成流

       与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:

1. 仅追加(Append-only)流

       仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。

2. 撤回(Retract)流

       撤回流是包含两类消息的流,添加(add)消息撤回(retract)消息

       具体的编码规则是:INSERT插入操作编码为add消息;DELETE删除操作编码为retract消息;而UPDATE更新操作则编码为被更改行的retract消息,和更新后行(新行)的add消息(把旧的结果撤回,新的结果追加)。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。

3. 更新插入(Upsert)流

       更新插入流中只包含两种类型的消息:更新插入(upsert)消息和删除(delete)消息。

       所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。

       可以看到,更新插入流比撤回流要精炼一点,直接一步到位。

       需要注意的是,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流,我们调用toChangelogStream()得到的其实就是撤回流。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性,也就是说而,更新插入(upsert)是取决于外部系统支持的

3、时间属性

       基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。

       所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。

       时间属性的数据类型必须为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。

       按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。都是固定写法,记住就完事了。

3.1、 事件时间

       事件时间属性可以在创建表DDL中定义,增加一个字段,通过WATERMARK语句来定义事件时间属性。具体定义方式如下:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  # WATERMARK FOR 时间字段 AS ts - 时间间隔(必须用单引号) 时间单位
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);

这里我们把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。

时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换

ts BIGINT,
# 精确到miao后面3位,也就是ms
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),

10.3.2 处理时间

       在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。

       在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);

4、DDL(Data Definition Language)数据定义

4.1、 数据库

1)创建数据库

(1)语法

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
 
  [COMMENT database_comment]
 
 
  WITH (key1=val1, key2=val2, ...)

2)查询数据库

# 查询所有数据库
 
SHOW DATABASES;
 
# 查询当前数据库
 
SHOW CURRENT DATABASE;

3)修改数据库

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...);

4)删除数据库

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
  • RESTRICT删除非空数据库会触发异常。默认启用
  • CASCADE删除非空数据库也会删除所有相关的表和函数。

5)切换当前数据库

USE database_name;

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
791 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
286 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
3月前
|
SQL 监控 关系型数据库
一键开启百倍加速!RDS DuckDB 黑科技让SQL查询速度最高提升200倍
RDS MySQL DuckDB分析实例结合事务处理与实时分析能力,显著提升SQL查询性能,最高可达200倍,兼容MySQL语法,无需额外学习成本。
|
3月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
3月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
2月前
|
SQL 关系型数据库 MySQL
(SQL)SQL语言中的查询语句整理
查询语句在sql中占了挺大一部分篇幅,因为在数据库中使用查询语句的次数远多于更新与删除命令。而查询语句比起其他语句要更加的复杂,可因为sql是数据库不可或缺的一部分,所以即使不懂,也必须得弄懂,以上。
219 0
|
4月前
|
SQL XML Java
通过MyBatis的XML配置实现灵活的动态SQL查询
总结而言,通过MyBatis的XML配置实现灵活的动态SQL查询,可以让开发者以声明式的方式构建SQL语句,既保证了SQL操作的灵活性,又简化了代码的复杂度。这种方式可以显著提高数据库操作的效率和代码的可维护性。
286 18
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
874 1
|
4月前
|
SQL 人工智能 数据库
【三桥君】如何正确使用SQL查询语句:避免常见错误?
三桥君解析了SQL查询中的常见错误和正确用法。AI产品专家三桥君通过三个典型案例:1)属性重复比较错误,应使用IN而非AND;2)WHERE子句中非法使用聚合函数的错误,应改用HAVING;3)正确的分组查询示例。三桥君还介绍了学生、课程和选课三个关系模式,并分析了SQL查询中的属性比较、聚合函数使用和分组查询等关键概念。最后通过实战练习帮助读者巩固知识,强调掌握这些技巧对提升数据库查询效率的重要性。
148 0
|
4月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
482 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

热门文章

最新文章