Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)https://developer.aliyun.com/article/1532272
4.2、 表
1)创建表
(1)语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( -- 正常的列 以及 元数据(比如Kafka数据携带的时间戳...) { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] -- 水印 [ <watermark_definition> ] -- 表的限制,比如主键 [ <table_constraint> ][ , ...n] ) -- 给表添加注释 [COMMENT table_comment] -- 像 hive 一样 partition by [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] -- with 里面指定这张表的一些属性和参数,比如连接器... WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] | AS select_query ]
① physical_column_definition
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
② metadata_column_definition
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。例如,我们可以使用元数据列从Kafka记录中读取和写入时间戳,用于基于时间的操作(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记)。connector和format文档列出了每个组件可用的元数据字段。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 把元数据赋值给 record_time 字段 `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka' ... );
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样, FROM xxx 子句可省略
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA ) WITH ( 'connector' = 'kafka' ... );
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致,程序运行时会自动 cast强转,但是这要求两种数据类型是可以强转的。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 将时间戳强转为 BIGINT `timestamp` BIGINT METADATA ) WITH ( 'connector' = 'kafka' ... );
默认情况下,Flink SQL planner 认为 metadata 列可以读取和写入。然而,在许多情况下,外部系统提供的只读元数据字段比可写字段多。因此,可以使用 VIRTUAL 关键字排除元数据列的持久化(表示只读)。
CREATE TABLE MyTable ( -- 可读可写 `timestamp` BIGINT METADATA, -- 只读 `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `name` STRING, ) WITH ( 'connector' = 'kafka' ... );
③ computed_column_definition
计算列是使用语法column_name AS computed_column_expression生成的虚拟列。
计算列就是拿已有的一些列经过一些自定义的运算生成的新列,在物理上并不存储在表中,只能读不能写。列的数据类型从给定的表达式自动派生,无需手动声明。
CREATE TABLE MyTable ( `user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, -- 把 price 列和 quanitity 列的值的乘积作为一个新列 `cost` AS price * quanitity ) WITH ( 'connector' = 'kafka' ... );
④ 定义Watermark
Flink SQL 提供了几种 WATERMARK 生产策略:
- 严格升序:WATERMARK FOR rowtime_column AS rowtime_column。
Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。
- 递增:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 。
一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。
- 有界无序: WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit 。
此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND ,则生成的是运行 5s 延迟的Watermark。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。
⑤ PRIMARY KEY
主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行,只支持 not enforced(这是语法规则,必须加上)。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, PARYMARY KEY(user_id) not enforced ) WITH ( 'connector' = 'kafka' ... );
⑥ PARTITIONED BY
创建分区表
⑦ with语句
用于创建表的表属性,用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。如下是Kafka的映射表:
CREATE TABLE KafkaTable ( `user_id` BIGINT, `name` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' )
一般 with 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。
⑧ LIKE
用于基于现有表的定义创建表。此外,用户可以扩展原始表或排除表的某些部分。
可以使用该子句重用(可能还会覆盖)某些连接器属性,或者向外部定义的表添加水印。
CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset' );
CREATE TABLE Orders_with_watermark ( -- Add watermark definition WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( -- Overwrite the startup-mode 'scan.startup.mode' = 'latest-offset' ) LIKE Orders;
⑨ AS select_statement(CTAS)
在一个create-table-as-select (CTAS)语句中,还可以通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。
CREATE TABLE my_ctas_table WITH ( 'connector' = 'kafka', ... ) AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
注意:CTAS有以下限制:
- 暂不支持创建临时表。
- 目前还不支持指定显式列(create table 后面不能自己写列字段)。
- 还不支持指定显式水印(不能自己添加水印)。
- 目前还不支持创建分区表。
- 目前还不支持指定主键约束。
(2)简单建表示例
创建一个 test 表,指定连接器为 print :
用 like 关键字创建一个结构和 test 表一样的表 test1 并在它的基础上增加一个字段 value:
使用查询结果来新建一个表:
我们可以看到,我们表 test 的查询结果只能被当做一个 Sink 来使用(也就是只能被插入),不能被当做输入源。
2)查看表
(1)查看所有表
SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]
如果没有指定数据库,则从当前数据库返回表。
LIKE子句中sql pattern的语法与MySQL方言的语法相同:
- %匹配任意数量的字符,甚至零字符,\%匹配一个'%'字符。
- _只匹配一个字符,\_只匹配一个'_'字符
(2)查看表信息
{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
3)修改表
(1)修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
(2)修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
表的属性,比如连接器等。
4)删除表
DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)https://developer.aliyun.com/article/1532278