在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table ddl如下: | CREATETABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json'-- 数据源格式为 json ); | 在查询时select * from user_behavior;报错如下: [ERROR] Could not execute SQL statement. Reason: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL converted type: RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL rel: LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[$5]) LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL SECOND)]) LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2], behavior=[$3], ts=[$4], proctime=[PROCTIME()]) LogicalTableScan(table=[[myhive, my_db, user_behavior, source: [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
flink版本:1.10.1 blink planner,streaming model
*来自志愿者整理的flink邮件归档
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的, https://issues.apache.org/jira/browse/FLINK-17189 https://issues.apache.org/jira/browse/FLINK-17189
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。