Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276

5、 查询

5.1、 DataGen & Print

1)创建数据生成器源表

CREATE TABLE source (
 
    id INT,
 
    ts BIGINT,
 
    vc INT
 
) WITH (
    -- flink 自带的数据生成器
    'connector' = 'datagen',
    -- 每s生成的数据条数
    'rows-per-second'='1',
    -- 生成类型 sequence代表自增序列,需要指定起始值和结束值
    'fields.id.kind'='sequence',
    -- id字段自增起始值
    'fields.id.start'='1',
    -- id字段自增结束值
    'fields.id.end'='10000',
    -- ts字段的生成类型
    'fields.ts.kind'='sequence',
    'fields.ts.start'='1',
    'fields.ts.end'='1000000',
    -- vc字段类型 随机值
    'fields.vc.kind'='random',
    -- 最小值 1
    'fields.vc.min'='1',
    -- 最大值 100
    'fields.vc.max'='100'
 
);
 
CREATE TABLE sink (
    id INT,
    ts BIGINT,
    vc INT
) WITH (
connector' = 'print'
);

2)查询源表

查询数据:select * from source;

注意:如果发现刷新不动,就退出去查看一下log4j输出了什么警告,有的警告可以忽略,但是有的可能就是原因。比如我是因为没有在环境变量中添加 HADOOP_CONF_DIR ,导致我的数据生成器不生成数据。

我们可以看到结果显示模式是 table 模式,这是默认的显示模式,我们在前面的常用配置里讲过,还有一种 changelog 模式可以设置。

我们再次查询:

我们可以看到,这种模式下,它的显示比 table 模式多了一列 op ,代表操作,+I 代表新增数据,撤回就是 -U。

此外还有一种模式叫做 tableau:

可以看到,这种模式喜爱,我们不会进入那个专门的数据展示界面,更加方便。

select * from source;

3)插入sink表并查询

创建 Sink表:

我们试着把 source 中的数据输出到 sink:

insert into sink select * from source;

可以看到它给我们返回了 一个 Job Id,我们可以直接查询 sink 表,或者也可以在 Web UI 中查看:

select * from sink;

或者

5.2、With子句

       WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。

1)语法

WITH <with_item_definition> [ , ... ]
 
SELECT ... FROM ...;
 
 
 
<with_item_defintion>:
 
    with_item_name (column_name[, ...n]) AS ( <select_query> )

2)案例

我们查询这个临时表就相当于执行了 with 内部的查询,比如下面:

WITH source_with_total AS (
    SELECT id, vc+10 AS total
    FROM source
)
-- 注意这里没有分号 这两个句子是一个作业里面的
SELECT id, SUM(total) FROM source_with_total GROUP BY id;

我们查询 source_with_total 就相当于查询了它内部的语句:select id,vc+10 as total from source;当然,我们在查这张临时表的时候可以选择字段。

需要注意的地方就是我们生成临时表的句子和查询临时表的句子是一个语句没有分号的,它们同属于一个作业,这个临时表只在这里生效,就像帮我们的查询语句简化了一下,作业结束它也就不存在了。我们完全可以写成这样:

select id,vc+10 as total from source;

5.3、SELECT & WHERE 子句

1)语法

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

2)案例

-- 自定义 Source 的数据
-- 不需要给表 t 的字段显示添加类型(添加会报错) flink会自动识别
SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
 
SELECT vc + 10 FROM source WHERE id >10;

通过查询结果,我们可以知道id=10的这条数据它的 vc 是<=10 的。

5.4、SELECT DISTINCT 子句

用作根据 key 进行数据去重

SELECT DISTINCT vc FROM source;

       对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(4)https://developer.aliyun.com/article/1532280

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 安全 数据库
如何在Django中正确使用参数化查询或ORM来避免SQL注入漏洞?
如何在Django中正确使用参数化查询或ORM来避免SQL注入漏洞?
141 77
|
27天前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
164 26
|
26天前
|
SQL NoSQL Java
Java使用sql查询mongodb
通过MongoDB Atlas Data Lake或Apache Drill,可以在Java中使用SQL语法查询MongoDB数据。这两种方法都需要适当的配置和依赖库的支持。希望本文提供的示例和说明能够帮助开发者实现这一目标。
45 17
|
21天前
|
SQL Oracle 关系型数据库
如何在 Oracle 中配置和使用 SQL Profiles 来优化查询性能?
在 Oracle 数据库中,SQL Profiles 是优化查询性能的工具,通过提供额外统计信息帮助生成更有效的执行计划。配置和使用步骤包括:1. 启用自动 SQL 调优;2. 手动创建 SQL Profile,涉及收集、执行调优任务、查看报告及应用建议;3. 验证效果;4. 使用 `DBA_SQL_PROFILES` 视图管理 Profile。
|
28天前
|
SQL Java 数据库连接
【潜意识Java】MyBatis中的动态SQL灵活、高效的数据库查询以及深度总结
本文详细介绍了MyBatis中的动态SQL功能,涵盖其背景、应用场景及实现方式。
91 6
|
2月前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
379 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
2月前
|
SQL NoSQL Java
Java使用sql查询mongodb
通过使用 MongoDB Connector for BI 和 JDBC,开发者可以在 Java 中使用 SQL 语法查询 MongoDB 数据库。这种方法对于熟悉 SQL 的团队非常有帮助,能够快速实现对 MongoDB 数据的操作。同时,也需要注意到这种方法的性能和功能限制,根据具体应用场景进行选择和优化。
109 9
|
2月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
178 14
|
5月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1734 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎