我使用tableEnv.sqlUpdate(ddl);方式创建表
但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?
我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。
sql创表语句如下:
CREATE TABLE T_UserBehavior(
userId BIGINT,
itemId BIGINT,
categoryId BIGINT,
behavior VARCHAR,
optime BIGINT
) WITH (
'connector.type' = 'filesystem', -- required: specify to connector type
'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory
'format.type' = 'csv',
'format.fields.0.name' = 'userId', -- required: define the schema either by using type information
'format.fields.0.type' = 'BIGINT',
'format.fields.1.name' = 'itemId',
'format.fields.1.type' = 'BIGINT',
'format.fields.2.name' = 'categoryId',
'format.fields.2.type' = 'BIGINT',
'format.fields.3.name' = 'behavior',
'format.fields.3.type' = 'VARCHAR',
'format.fields.4.name' = 'optime',
'format.fields.4.type' = 'BIGINT'
);*来自志愿者整理的flink邮件归档
在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1] 你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。
[2] 中有使用的完整例子,FYI。
[1] https://issues.apache.org/jira/browse/FLINK-14320 [2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。