开发者社区> 问答> 正文

[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowt

我使用tableEnv.sqlUpdate(ddl);方式创建表

但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。

请问在flink中是否支持使用该种方式创建流表,并开窗?

我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。

sql创表语句如下:

CREATE TABLE T_UserBehavior(

userId BIGINT,

itemId BIGINT,

categoryId BIGINT,

behavior VARCHAR,

optime BIGINT

) WITH (

'connector.type' = 'filesystem', -- required: specify to connector type

'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory

'format.type' = 'csv',

'format.fields.0.name' = 'userId', -- required: define the schema either by using type information

'format.fields.0.type' = 'BIGINT',

'format.fields.1.name' = 'itemId',

'format.fields.1.type' = 'BIGINT',

'format.fields.2.name' = 'categoryId',

'format.fields.2.type' = 'BIGINT',

'format.fields.3.name' = 'behavior',

'format.fields.3.type' = 'VARCHAR',

'format.fields.4.name' = 'optime',

'format.fields.4.type' = 'BIGINT'

);*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 11:41:05 1504 0
1 条回答
写回答
取消 提交回答
  • 在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1] 你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。

    [2] 中有使用的完整例子,FYI。

    [1] https://issues.apache.org/jira/browse/FLINK-14320 [2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala*来自志愿者整理的flink邮件归档

    2021-12-08 14:35:40
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载