开发者社区> 问答> 正文

flink使用hive作为维表,kafka作为数据源,join时候报错怎么办?

select .... FROM jdqTableSources AS a JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner]) FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur, cus]) FlinkLogicalSnapshot(period=[$cor0.proctime]) FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc]) FlinkLogicalTableSourceScan(table=[[myhive, dev, dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name, premium, cate_lev, type, borc, plan_code, subjection_b, product_name, lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name, item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 11:06:58 1722 0
1 条回答
写回答
取消 提交回答
  • 你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?*来自志愿者整理的FLINK邮件归档

    2021-12-02 11:23:37
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hive Bucketing in Apache Spark 立即下载
spark替代HIVE实现ETL作业 立即下载
2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载