Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276

5、 查询

5.1、 DataGen & Print

1)创建数据生成器源表

CREATE TABLE source (
 
    id INT,
 
    ts BIGINT,
 
    vc INT
 
) WITH (
    -- flink 自带的数据生成器
    'connector' = 'datagen',
    -- 每s生成的数据条数
    'rows-per-second'='1',
    -- 生成类型 sequence代表自增序列,需要指定起始值和结束值
    'fields.id.kind'='sequence',
    -- id字段自增起始值
    'fields.id.start'='1',
    -- id字段自增结束值
    'fields.id.end'='10000',
    -- ts字段的生成类型
    'fields.ts.kind'='sequence',
    'fields.ts.start'='1',
    'fields.ts.end'='1000000',
    -- vc字段类型 随机值
    'fields.vc.kind'='random',
    -- 最小值 1
    'fields.vc.min'='1',
    -- 最大值 100
    'fields.vc.max'='100'
 
);
 
CREATE TABLE sink (
    id INT,
    ts BIGINT,
    vc INT
) WITH (
connector' = 'print'
);

2)查询源表

查询数据:select * from source;

注意:如果发现刷新不动,就退出去查看一下log4j输出了什么警告,有的警告可以忽略,但是有的可能就是原因。比如我是因为没有在环境变量中添加 HADOOP_CONF_DIR ,导致我的数据生成器不生成数据。

我们可以看到结果显示模式是 table 模式,这是默认的显示模式,我们在前面的常用配置里讲过,还有一种 changelog 模式可以设置。

我们再次查询:

我们可以看到,这种模式下,它的显示比 table 模式多了一列 op ,代表操作,+I 代表新增数据,撤回就是 -U。

此外还有一种模式叫做 tableau:

可以看到,这种模式喜爱,我们不会进入那个专门的数据展示界面,更加方便。

select * from source;

3)插入sink表并查询

创建 Sink表:

我们试着把 source 中的数据输出到 sink:

insert into sink select * from source;

可以看到它给我们返回了 一个 Job Id,我们可以直接查询 sink 表,或者也可以在 Web UI 中查看:

select * from sink;

或者

5.2、With子句

       WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。

1)语法

WITH <with_item_definition> [ , ... ]
 
SELECT ... FROM ...;
 
 
 
<with_item_defintion>:
 
    with_item_name (column_name[, ...n]) AS ( <select_query> )

2)案例

我们查询这个临时表就相当于执行了 with 内部的查询,比如下面:

WITH source_with_total AS (
    SELECT id, vc+10 AS total
    FROM source
)
-- 注意这里没有分号 这两个句子是一个作业里面的
SELECT id, SUM(total) FROM source_with_total GROUP BY id;

我们查询 source_with_total 就相当于查询了它内部的语句:select id,vc+10 as total from source;当然,我们在查这张临时表的时候可以选择字段。

需要注意的地方就是我们生成临时表的句子和查询临时表的句子是一个语句没有分号的,它们同属于一个作业,这个临时表只在这里生效,就像帮我们的查询语句简化了一下,作业结束它也就不存在了。我们完全可以写成这样:

select id,vc+10 as total from source;

5.3、SELECT & WHERE 子句

1)语法

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

2)案例

-- 自定义 Source 的数据
-- 不需要给表 t 的字段显示添加类型(添加会报错) flink会自动识别
SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
 
SELECT vc + 10 FROM source WHERE id >10;

通过查询结果,我们可以知道id=10的这条数据它的 vc 是<=10 的。

5.4、SELECT DISTINCT 子句

用作根据 key 进行数据去重

SELECT DISTINCT vc FROM source;

       对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(4)https://developer.aliyun.com/article/1532280

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
791 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
286 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
874 1
|
7月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1204 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
419 15
|
11月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1930 27
|
12月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
840 14
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
265 0
|
SQL 关系型数据库 MySQL
1.SQL语句基础之DDL
<div style="font-family:微软雅黑; font-size:14px; line-height:21px; widows:auto"><strong>SQL(Structured Query Language 结构化查询语言)</strong></div> <div style="font-family:微软雅黑; font-size:14px; line-heigh
1393 0
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")