Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

前言

       最近在假期实训,但是实在水的不行,三天要学完SSM,实在一言难尽,浪费那时间干什么呢。SSM 之前学了一半,等后面忙完了,再去好好重学一遍,毕竟这玩意真是面试必会的东西。

       今天开始学习 Flink 最后一部分 Flink SQL ,完了还有不少框架得学:Kafka、Flume、ClickHouse、Hudi、Azkaban、OOzie ... 有的算是小工具,不费劲,但是学完得复习啊,这么多东西,必须赶紧做个小项目练练手。

Flink SQL

       到现在我们学完了底层API(也就是 process)和核心 API(这里由于Flink现在已经流批一体,所以我们只学习 DataStream就好了),然后就是剩下的 Table API(类似于 Spark 中的 DataFrame 和 DataSet)和SQL(类似于Spark SQL)。显然最上层的都是高级 API ,它们的底层还是我们学的这些 DataStream 和 process 算子。不过毕竟是高级 API ,它对 SQL 语句都进行了优化,一般能用 SQL 肯定没人愿意用繁琐的代码去实现,大大降低了开发 Flink 程序的难度,但是一些 SQL 实现不了的东西当然还是得底层核心 API 来实现,就像 Spark 中的 RDD 编程一样。

1、sql-client 准备

       为了方便练习演示 Flink SQL 语法,我们需要使用Flink 提供的 sql-client 进行操作。(类似于我们 Hive 中给的 hive 命令进入一个命令行模式就可以进行一些 hive sql 的操作)

1.1、基于 yarn-session 模式

1)启动 Flink
# 先启动 hadoop
myhadoop start
# 不需要启动 flink
/opt/module/flink-1.17.0/bin/yarn-session.sh -d

完了直接从 Yarn 的 Web UI 跳转到 Flink 的 Web UI

2)启动 sql-client
./sql-client.sh embedded -s yarn-session

1.2、常用配置

1)结果显示模式
#默认 table,还可以设置为 tableau、changelog
SET sql-client.execution.result-mode=changelog
2)执行环境
SET execution.runtime-mode=streaming; #默认是 streaming,也可以设置为为 batch
3)默认并行度
# 默认使用的是 flink 配置文件中的默认值
SET parallelism.default=1
4)设置状态TTL
SET table.exec.state.ttl=1000    #单位 ms
5)通过 sql 文件初始化

我们在

1. 创建 sql 文件

vim sql-client-init.sql
create database mydatabases;

2. 启动时,指定sql文件

./sql-client.sh embedded -s yarn-session -i sql-client-init.sql

2、流处理中的表

MySQL Flink
处理的数据对象 字段元祖的有界集合 字段元祖的无限序列
查询对数据的访问 可以访问到完整的数据输入 无法访问所有数据,必须持续等待流式输入
查询终止条件 生成固定大小的结果集后终止 永不停止,根据持续收到的数据不断更新查询结果(停不下来)

可以看到,关系型数据库 SQL 和我们 Flink SQL 的区别还是很大的。

2.1、动态表和持续查询

       流处理面对的数据是连续不断的,未知的,这就导致流处理中的“表”和我们熟悉的传统关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。

1)动态表(Dynamic Tables)

       简答来说,就是来一条数据,插入一行数据,我们的表随着数据的增加也不断扩大,所以叫动态表。

       当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。

       动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。

2)持续查询(Continuous Query)

       动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。

       也就是说,对于 MySQL ,查询出来的数据是某一刻的数据,但是 Flink 的查询是停不下来的,它的查询结果是一直动态变化的,所以叫持续查询。

       由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。

持续查询的步骤如下:

(1)流(stream)被转换为动态表(dynamic table);

(2)对动态表进行持续查询(continuous query),生成新的动态表;

(3)生成的动态表被转换成流。

这样,只要API将流和动态表的转换封装起来,我们就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。

2.2、将流转换成动态表

       如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。

       例如,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。

2.3、用 SQL 持续查询

1)更新(Update)查询

我们在代码中定义了一个SQL查询。

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

       当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用 toChangelogStream() 方法。

       我们新来数据按说是应该追加到后面的,但是这里的第三条数据 "Alice ./prod?id=1 5000"并没有追加到表的后面,而是把我们表的第一行也为 "Alice" 的那一行进行了修改,这就是更新流查询,它是通过回撤流实现的。

2)追加(Append)查询

       上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。

Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");

       这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。

       由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作。所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。

2.4、将动态表转换成流

       与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:

1. 仅追加(Append-only)流

       仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。

2. 撤回(Retract)流

       撤回流是包含两类消息的流,添加(add)消息撤回(retract)消息

       具体的编码规则是:INSERT插入操作编码为add消息;DELETE删除操作编码为retract消息;而UPDATE更新操作则编码为被更改行的retract消息,和更新后行(新行)的add消息(把旧的结果撤回,新的结果追加)。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。

3. 更新插入(Upsert)流

       更新插入流中只包含两种类型的消息:更新插入(upsert)消息和删除(delete)消息。

       所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。

       可以看到,更新插入流比撤回流要精炼一点,直接一步到位。

       需要注意的是,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流,我们调用toChangelogStream()得到的其实就是撤回流。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性,也就是说而,更新插入(upsert)是取决于外部系统支持的

3、时间属性

       基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。

       所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。

       时间属性的数据类型必须为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。

       按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。都是固定写法,记住就完事了。

3.1、 事件时间

       事件时间属性可以在创建表DDL中定义,增加一个字段,通过WATERMARK语句来定义事件时间属性。具体定义方式如下:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  # WATERMARK FOR 时间字段 AS ts - 时间间隔(必须用单引号) 时间单位
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);

这里我们把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。

时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换

ts BIGINT,
# 精确到miao后面3位,也就是ms
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),

10.3.2 处理时间

       在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。

       在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);

4、DDL(Data Definition Language)数据定义

4.1、 数据库

1)创建数据库

(1)语法

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
 
  [COMMENT database_comment]
 
 
  WITH (key1=val1, key2=val2, ...)

2)查询数据库

# 查询所有数据库
 
SHOW DATABASES;
 
# 查询当前数据库
 
SHOW CURRENT DATABASE;

3)修改数据库

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...);

4)删除数据库

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
  • RESTRICT删除非空数据库会触发异常。默认启用
  • CASCADE删除非空数据库也会删除所有相关的表和函数。

5)切换当前数据库

USE database_name;

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
91 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
8天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
28天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
36 8
|
1月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
59 4
|
1月前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
1月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
143 10
|
1月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
1月前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
35 0
|
2月前
|
SQL 数据可视化 BI
SQL语句及查询结果解析:技巧与方法
在数据库管理和数据分析中,SQL语句扮演着至关重要的角色