Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276

5、 查询

5.1、 DataGen & Print

1)创建数据生成器源表

CREATE TABLE source (
 
    id INT,
 
    ts BIGINT,
 
    vc INT
 
) WITH (
    -- flink 自带的数据生成器
    'connector' = 'datagen',
    -- 每s生成的数据条数
    'rows-per-second'='1',
    -- 生成类型 sequence代表自增序列,需要指定起始值和结束值
    'fields.id.kind'='sequence',
    -- id字段自增起始值
    'fields.id.start'='1',
    -- id字段自增结束值
    'fields.id.end'='10000',
    -- ts字段的生成类型
    'fields.ts.kind'='sequence',
    'fields.ts.start'='1',
    'fields.ts.end'='1000000',
    -- vc字段类型 随机值
    'fields.vc.kind'='random',
    -- 最小值 1
    'fields.vc.min'='1',
    -- 最大值 100
    'fields.vc.max'='100'
 
);
 
CREATE TABLE sink (
    id INT,
    ts BIGINT,
    vc INT
) WITH (
connector' = 'print'
);

2)查询源表

查询数据:select * from source;

注意:如果发现刷新不动,就退出去查看一下log4j输出了什么警告,有的警告可以忽略,但是有的可能就是原因。比如我是因为没有在环境变量中添加 HADOOP_CONF_DIR ,导致我的数据生成器不生成数据。

我们可以看到结果显示模式是 table 模式,这是默认的显示模式,我们在前面的常用配置里讲过,还有一种 changelog 模式可以设置。

我们再次查询:

我们可以看到,这种模式下,它的显示比 table 模式多了一列 op ,代表操作,+I 代表新增数据,撤回就是 -U。

此外还有一种模式叫做 tableau:

可以看到,这种模式喜爱,我们不会进入那个专门的数据展示界面,更加方便。

select * from source;

3)插入sink表并查询

创建 Sink表:

我们试着把 source 中的数据输出到 sink:

insert into sink select * from source;

可以看到它给我们返回了 一个 Job Id,我们可以直接查询 sink 表,或者也可以在 Web UI 中查看:

select * from sink;

或者

5.2、With子句

       WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。

1)语法

WITH <with_item_definition> [ , ... ]
 
SELECT ... FROM ...;
 
 
 
<with_item_defintion>:
 
    with_item_name (column_name[, ...n]) AS ( <select_query> )

2)案例

我们查询这个临时表就相当于执行了 with 内部的查询,比如下面:

WITH source_with_total AS (
    SELECT id, vc+10 AS total
    FROM source
)
-- 注意这里没有分号 这两个句子是一个作业里面的
SELECT id, SUM(total) FROM source_with_total GROUP BY id;

我们查询 source_with_total 就相当于查询了它内部的语句:select id,vc+10 as total from source;当然,我们在查这张临时表的时候可以选择字段。

需要注意的地方就是我们生成临时表的句子和查询临时表的句子是一个语句没有分号的,它们同属于一个作业,这个临时表只在这里生效,就像帮我们的查询语句简化了一下,作业结束它也就不存在了。我们完全可以写成这样:

select id,vc+10 as total from source;

5.3、SELECT & WHERE 子句

1)语法

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

2)案例

-- 自定义 Source 的数据
-- 不需要给表 t 的字段显示添加类型(添加会报错) flink会自动识别
SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price);
 
SELECT vc + 10 FROM source WHERE id >10;

通过查询结果,我们可以知道id=10的这条数据它的 vc 是<=10 的。

5.4、SELECT DISTINCT 子句

用作根据 key 进行数据去重

SELECT DISTINCT vc FROM source;

       对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(4)https://developer.aliyun.com/article/1532280

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用问题之如何调整改变SQL查询的严格性
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之如果oss文件过大,如何在不调整oss源文件大小的情况下优化查询sql
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用问题之如何通过临时查询功能来书写和运行SQL语句
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
SQL 前端开发 关系型数据库
零基础学习数据库SQL语句之查询表中数据的DQL语句
零基础学习数据库SQL语句之查询表中数据的DQL语句
6 0
|
9天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8天前
|
SQL IDE Java
Java连接SQL Server数据库的详细操作流程
Java连接SQL Server数据库的详细操作流程
|
16天前
|
SQL DataWorks NoSQL
DataWorks产品使用合集之如何将SQL Server中的数据转存到MongoDB
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
244 1
|
1月前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
162 1
|
9天前
|
SQL 存储 关系型数据库
关系型数据库中的SQL Server
【6月更文挑战第11天】
44 3