Flink SQL 中动态修改 DDL 的属性

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 支持使用 HiveCatalog 来管理 Flink 的元数据信息, HiveCatalog 通过授权用户仅创建一次表和其他元数据对象,这样就避免了重复创建 kafka 流表,节省了大量的时间, 对于不同的用户来说,可以直接使用表而不需要再去创建.就拿消费 kafka 来说,经常会有下面的需求:

Flink 支持使用 HiveCatalog 来管理 Flink 的元数据信息, HiveCatalog 通过授权用户仅创建一次表和其他元数据对象,这样就避免了重复创建 kafka 流表,节省了大量的时间, 对于不同的用户来说,可以直接使用表而不需要再去创建.


就拿消费 kafka 来说,经常会有下面的需求:


•用户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性.•用户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true.•不同用户想要设置不同的 group.id 去消费数据.•用户想要改变消费 kafka 的策略比如从最早的 offset 开始消费数据.


类似于这样的需求很常见,有的时候只是临时修改一下属性,难道需要把表删了,再重新创建新的表吗 ? 显然不是的,下面就介绍两种方式去修改表的属性.


1, ALTER TABLE

•重命名表


ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name


将给定的表名重命名为另一个新的表名。


•设置或更改表属性


ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

在指定的表中设置一个或多个属性。如果表中已经设置了特定属性,会用新属性覆盖旧值。


2, Dynamic Table Options


从 1.11 开始,用户可以通过动态参数的形式灵活地设置表的属性参数,覆盖或者追加原表的 WITH (...) 语句内定义的 table options。


基本语法为:


table_path /*+ OPTIONS(key=val [, key=val]*) */


动态参数的使用没有语境限制,只要是引用表的地方都可以追加定义。在指定的表后面追加的动态参数会自动追加到原表定义中,是不是很方便呢


由于可能对查询结果有影响,动态参数功能默认是关闭的, 使用下面的方式开启该功能:


set table.dynamic-table-options.enabled=true;


下面就来分别演示一下这两种方法的具体操作


3, 建表


DROP TABLE IF EXISTS KafkaTable;
CREATE TABLE KafkaTable (
  `age` BIGINT,
  `name` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `topic` STRING METADATA VIRTUAL,
   WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'new_flink_topic',
  'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',
  'properties.group.id' = 'flink_jason',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
  'json.ignore-parse-errors' = 'true'  -- 解析失败跳过
);
DROP TABLE IF EXISTS print_table;
CREATE TABLE print_table
(
    name string,
    pv BIGINT,
    uv BIGINT
)
WITH ('connector' = 'print');
insert into print_table 
SELECT name,
count(*) as pv,
count(distinct name) as uv 
from KafkaTable 
group by name;


上面把 3 条语句放在一块执行了,然后先在 Flink SQL 里面查一下刚才建的 kafka 流表和打印数据的结果表



然后再到 hive 里面查一下这两个表是否存在



可以看到这两个表在 hive 里面也是存在的,因为已经配置了 HiveCatalog 后面就不用再重复建表了,可以直接使用这两个表.


再来看一下 hive 里面的表结构信息.


hive> show create table kafkatable ;
OK
CREATE TABLE `kafkatable`(
)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://master:9000/hive/warehouse/mydatabase.db/kafkatable'
TBLPROPERTIES (
  'flink.connector'='kafka', 
  'flink.format'='json', 
  'flink.json.fail-on-missing-field'='false', 
  'flink.json.ignore-parse-errors'='true', 
  'flink.properties.bootstrap.servers'='master:9092,storm1:9092,storm2:9092', 
  'flink.properties.group.id'='flink_jason', 
  'flink.scan.startup.mode'='latest-offset', 
  'flink.schema.0.data-type'='BIGINT', 
  'flink.schema.0.name'='age', 
  'flink.schema.1.data-type'='VARCHAR(2147483647)', 
  'flink.schema.1.name'='name', 
  'flink.schema.2.data-type'='TIMESTAMP(3)', 
  'flink.schema.2.metadata'='timestamp', 
  'flink.schema.2.name'='ts', 
  'flink.schema.2.virtual'='false', 
  'flink.schema.3.data-type'='BIGINT', 
  'flink.schema.3.metadata'='partition', 
  'flink.schema.3.name'='partition', 
  'flink.schema.3.virtual'='true', 
  'flink.schema.4.data-type'='BIGINT', 
  'flink.schema.4.metadata'='offset', 
  'flink.schema.4.name'='offset', 
  'flink.schema.4.virtual'='true', 
  'flink.schema.5.data-type'='VARCHAR(2147483647)', 
  'flink.schema.5.metadata'='topic', 
  'flink.schema.5.name'='topic', 
  'flink.schema.5.virtual'='true', 
  'flink.schema.watermark.0.rowtime'='ts', 
  'flink.schema.watermark.0.strategy.data-type'='TIMESTAMP(3)', 
  'flink.schema.watermark.0.strategy.expr'='`ts` - INTERVAL \'0\' SECOND', 
  'flink.topic'='new_flink_topic', 
  'is_generic'='true', 
  'transient_lastDdlTime'='1608409788')
Time taken: 0.046 seconds, Fetched: 44 row(s)


可以看到 'flink.scan.startup.mode'='latest-offset' 这个属性是从最新的位置开始消费数据的.然后我们来看一下 Flink 的 UI



因为是从最新的 offset 位置开始消费的,我没有写入新数据,所以消费到的数据是 0 条.然后我把任务停掉, 加上动态表选项,修改为从头开始消费,然后再提交任务.


insert into print_table 
SELECT name,
count(*) as pv,
count(distinct name) as uv 
from KafkaTable /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
group by name;


可以看到在我没有写入新数据的情况下,已经消费到 534 条数据.也可以到 tm 的 stdout 里面观察一下数据打印的变化情况,说明刚才修改的 'scan.startup.mode'='earliest-offset' 属性是起作用的,任务是从头开始消费数据.


然后我再用 alter table 来直接修改表里面的属性.


alter table myhive.mydatabase.KafkaTable set('scan.startup.mode'='earliest-offset');


先不提交任务,先到 hive 表里看下属性是否修改了.


> show create table kafkatable ;
OK
CREATE TABLE `kafkatable`(
)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://master:9000/hive/warehouse/mydatabase.db/kafkatable'
TBLPROPERTIES (
  'flink.connector'='kafka', 
  'flink.format'='json', 
  'flink.json.fail-on-missing-field'='false', 
  'flink.json.ignore-parse-errors'='true', 
  'flink.properties.bootstrap.servers'='master:9092,storm1:9092,storm2:9092', 
  'flink.properties.group.id'='flink_jason', 
  'flink.scan.startup.mode'='earliest-offset', 
  'flink.schema.0.data-type'='BIGINT', 
  'flink.schema.0.name'='age', 
  'flink.schema.1.data-type'='VARCHAR(2147483647)', 
  'flink.schema.1.name'='name', 
  'flink.schema.2.data-type'='TIMESTAMP(3)', 
  'flink.schema.2.metadata'='timestamp', 
  'flink.schema.2.name'='ts', 
  'flink.schema.2.virtual'='false', 
  'flink.schema.3.data-type'='BIGINT', 
  'flink.schema.3.metadata'='partition', 
  'flink.schema.3.name'='partition', 
  'flink.schema.3.virtual'='true', 
  'flink.schema.4.data-type'='BIGINT', 
  'flink.schema.4.metadata'='offset', 
  'flink.schema.4.name'='offset', 
  'flink.schema.4.virtual'='true', 
  'flink.schema.5.data-type'='VARCHAR(2147483647)', 
  'flink.schema.5.metadata'='topic', 
  'flink.schema.5.name'='topic', 
  'flink.schema.5.virtual'='true', 
  'flink.schema.watermark.0.rowtime'='ts', 
  'flink.schema.watermark.0.strategy.data-type'='TIMESTAMP(3)', 
  'flink.schema.watermark.0.strategy.expr'='`ts` - INTERVAL \'0\' SECOND', 
  'flink.topic'='new_flink_topic', 
  'is_generic'='true', 
  'transient_lastDdlTime'='1608411139')
Time taken: 0.043 seconds, Fetched: 44 row(s)


'flink.scan.startup.mode'='earliest-offset' 可以看到这个属性已经被修改了.然后直接提交任务就可以了,不需要再加刚才的那个动态表属性.



还是刚才的 534 条数据,这两种方式都可以解决上面的用户需求.


区别:


alter table 的方式是直接修改表的属性, 元数据信息也会被修改, Dynamic Table Options 的方式只是通过动态参数的形式追加或者覆盖原表的属性,但是不会修改元数据信息.建议在生产环境还是使用第二种方式比较好.

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
785 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
282 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
4月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
860 1
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1183 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
10月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1920 27
|
11月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
826 14
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1099 2
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1476 6
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
213 4
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
406 3