开发者社区> 问答> 正文

flink sql使用维表关联时报Temporal table join 出现问题

本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR SYSTEM_TIME AS OF 语法关联维表: select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time, l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level, CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as index_value from x.log.yanfa_log AS l LEFT JOIN x.saic_auth_user.t_driver FOR SYSTEM_TIME AS OF l.proctime AS d ON l.req_body['driverId'] = d.uid LEFT JOIN x.saic_cms_config.t_vehicle FOR SYSTEM_TIME AS OF l.proctime AS v ON d.vin=v.vehicle_vin where l.ret_code = '0' and l.servicename = 'MatchGtw.uploadLocationV4' and l.req_body['appId'] = 'saic_card' GROUP BY TUMBLE(l.dt, INTERVAL '30' SECOND), l.extra_info['cityCode'], v.vehicle_level; 建表语句用了computed columns: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING, servicename STRING, ret_code STRING, duration STRING, req_body MAP<String,String>, res_body MAP<STRING,STRING>, extra_info MAP<STRING,STRING>, proctime TIMESTAMP(3), WATERMARK FOR dt AS dt - INTERVAL '60' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'x-log-yanfa_log', 'connector.properties.bootstrap.servers' = '******:9092', 'connector.properties.zookeeper.connect' = '******:2181', 'connector.startup-mode' = 'latest-offset', 'update-mode' = 'append', 'format.type' = 'json', 'format.fail-on-missing-field' = 'true' ); 报如下异常: Caused by: org.apache.flink.table.api.TableException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()' at org.apache.flink.table.planner.plan.rules.physical.common.CommonLookupJoinRule$class.matches(CommonLookupJoinRule.scala:67) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:118) at org.apache.flink.table.planner.plan.rules.physical.common.BaseSnapshotOnTableScanRule.matches(CommonLookupJoinRule.scala:131) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:263) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:370) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:247) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1534) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1807) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:90) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:329) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1668) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:846) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:868) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:529) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:324) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 20 more

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

展开
收起
毛毛虫雨 2021-12-05 09:07:55 4016 0
1 条回答
写回答
取消 提交回答
  • 试验了下,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成。

    比如: Select …., PROCTIME() AS proctime from xxx; Select * from xxx t1 left join yyy for system_time as of t1.proctime as t2 on t1.id = t2.id; 这样才行。

    来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

    2021-12-05 10:47:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载