Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)https://developer.aliyun.com/article/1532272

4.2、

1)创建表

(1)语法

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
 
  (
    -- 正常的列 以及 元数据(比如Kafka数据携带的时间戳...)
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    -- 水印
    [ <watermark_definition> ]
    -- 表的限制,比如主键
    [ <table_constraint> ][ , ...n]
 
  )
  -- 给表添加注释
  [COMMENT table_comment]
  -- 像 hive 一样 partition by
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  -- with 里面指定这张表的一些属性和参数,比如连接器...
  WITH (key1=val1, key2=val2, ...)
 
  [ LIKE source_table [( <like_options> )] | AS select_query ]

① physical_column_definition

       物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。

metadata_column_definition

       元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。例如,我们可以使用元数据列从Kafka记录中读取和写入时间戳,用于基于时间的操作(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记)。connector和format文档列出了每个组件可用的元数据字段。

CREATE TABLE MyTable (
 
  `user_id` BIGINT,
 
  `name` STRING,
  -- 把元数据赋值给 record_time 字段
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
 
) WITH (
 
  'connector' = 'kafka'
 
  ...
 
);

如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx 子句可省略

CREATE TABLE MyTable (
 
`user_id` BIGINT,
 
`name` STRING,
 
`timestamp` TIMESTAMP_LTZ(3) METADATA
 
) WITH (
 
'connector' = 'kafka'
 
...
 
);

如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致,程序运行时会自动 cast强转,但是这要求两种数据类型是可以强转的。

CREATE TABLE MyTable (
 
`user_id` BIGINT,
 
`name` STRING,
 
-- 将时间戳强转为 BIGINT
 
`timestamp` BIGINT METADATA
 
) WITH (
 
'connector' = 'kafka'
 
...
 
);

默认情况下,Flink SQL planner 认为 metadata 列可以读取和写入。然而,在许多情况下,外部系统提供的只读元数据字段比可写字段多。因此,可以使用 VIRTUAL 关键字排除元数据列的持久化(表示只读)。

CREATE TABLE MyTable (
  -- 可读可写
  `timestamp` BIGINT METADATA,
  -- 只读
  `offset` BIGINT METADATA VIRTUAL,
 
  `user_id` BIGINT,
 
  `name` STRING,
 
) WITH (
 
  'connector' = 'kafka'
 
  ...
 
);

computed_column_definition

计算列是使用语法column_name AS computed_column_expression生成的虚拟列。

计算列就是拿已有的一些列经过一些自定义的运算生成的新列,在物理上并不存储在表中,只能读不能写。列的数据类型从给定的表达式自动派生,无需手动声明。

CREATE TABLE MyTable (
 
  `user_id` BIGINT,
 
  `price` DOUBLE,
 
  `quantity` DOUBLE,
  -- 把 price 列和 quanitity 列的值的乘积作为一个新列
  `cost` AS price * quanitity
 
) WITH (
 
  'connector' = 'kafka'
 
  ...
 
);

④ 定义Watermark

Flink SQL 提供了几种 WATERMARK 生产策略:

  • 严格升序:WATERMARK FOR rowtime_column AS rowtime_column。

Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。

  • 递增:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

  • 有界无序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit 。

此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND ,则生成的是运行 5s 延迟的Watermark。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。

⑤ PRIMARY KEY

主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值主键唯一地标识表中的一行,只支持 not enforced(这是语法规则,必须加上)。

CREATE TABLE MyTable (
 
`user_id` BIGINT,
 
`name` STRING,
 
PARYMARY KEY(user_id) not enforced
 
) WITH (
 
'connector' = 'kafka'
 
...
 
);

⑥ PARTITIONED BY

创建分区表

⑦ with语句

用于创建表的表属性,用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。如下是Kafka的映射表:

CREATE TABLE KafkaTable (
 
`user_id` BIGINT,
 
`name` STRING,
 
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
 
) WITH (
 
'connector' = 'kafka',
 
'topic' = 'user_behavior',
 
'properties.bootstrap.servers' = 'localhost:9092',
 
'properties.group.id' = 'testGroup',
 
'scan.startup.mode' = 'earliest-offset',
 
'format' = 'csv'
 
)

一般 with 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。

⑧ LIKE

用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除表的某些部分。

可以使用该子句重用(可能还会覆盖)某些连接器属性,或者向外部定义的表添加水印。

CREATE TABLE Orders (
 
    `user` BIGINT,
 
    product STRING,
 
    order_time TIMESTAMP(3)
 
) WITH (
 
    'connector' = 'kafka',
 
    'scan.startup.mode' = 'earliest-offset'
 
);
CREATE TABLE Orders_with_watermark (
 
    -- Add watermark definition
 
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
 
) WITH (
 
    -- Overwrite the startup-mode
 
    'scan.startup.mode' = 'latest-offset'
 
)
 
LIKE Orders;

⑨ AS select_statement(CTAS)

在一个create-table-as-select (CTAS)语句中,还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。

CREATE TABLE my_ctas_table
 
WITH (
 
    'connector' = 'kafka',
 
    ...
 
)
 
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意:CTAS有以下限制:

  • 暂不支持创建临时表。
  • 目前还不支持指定显式列(create table 后面不能自己写列字段)。
  • 还不支持指定显式水印(不能自己添加水印)。
  • 目前还不支持创建分区表。
  • 目前还不支持指定主键约束。

(2)简单建表示例

创建一个 test 表,指定连接器为 print :

用 like 关键字创建一个结构和 test 表一样的表 test1 并在它的基础上增加一个字段 value:

使用查询结果来新建一个表:

我们可以看到,我们表 test 的查询结果只能被当做一个 Sink 来使用(也就是只能被插入),不能被当做输入源。

2)查看表

(1)查看所有表

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

如果没有指定数据库,则从当前数据库返回表。

LIKE子句中sql pattern的语法与MySQL方言的语法相同:

  • %匹配任意数量的字符,甚至零字符,\%匹配一个'%'字符。
  • _只匹配一个字符,\_只匹配一个'_'字符

(2)查看表信息

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

3)修改表

(1)修改表名

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

(2)修改表属性

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

表的属性,比如连接器等。

4)删除表

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)https://developer.aliyun.com/article/1532278

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
791 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
286 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
3月前
|
SQL 监控 关系型数据库
一键开启百倍加速!RDS DuckDB 黑科技让SQL查询速度最高提升200倍
RDS MySQL DuckDB分析实例结合事务处理与实时分析能力,显著提升SQL查询性能,最高可达200倍,兼容MySQL语法,无需额外学习成本。
|
3月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
3月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
2月前
|
SQL 关系型数据库 MySQL
(SQL)SQL语言中的查询语句整理
查询语句在sql中占了挺大一部分篇幅,因为在数据库中使用查询语句的次数远多于更新与删除命令。而查询语句比起其他语句要更加的复杂,可因为sql是数据库不可或缺的一部分,所以即使不懂,也必须得弄懂,以上。
219 0
|
4月前
|
SQL XML Java
通过MyBatis的XML配置实现灵活的动态SQL查询
总结而言,通过MyBatis的XML配置实现灵活的动态SQL查询,可以让开发者以声明式的方式构建SQL语句,既保证了SQL操作的灵活性,又简化了代码的复杂度。这种方式可以显著提高数据库操作的效率和代码的可维护性。
286 18
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
874 1
|
4月前
|
SQL 人工智能 数据库
【三桥君】如何正确使用SQL查询语句:避免常见错误?
三桥君解析了SQL查询中的常见错误和正确用法。AI产品专家三桥君通过三个典型案例:1)属性重复比较错误,应使用IN而非AND;2)WHERE子句中非法使用聚合函数的错误,应改用HAVING;3)正确的分组查询示例。三桥君还介绍了学生、课程和选课三个关系模式,并分析了SQL查询中的属性比较、聚合函数使用和分组查询等关键概念。最后通过实战练习帮助读者巩固知识,强调掌握这些技巧对提升数据库查询效率的重要性。
148 0
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1104 2