开发者社区> 问答> 正文

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for ts1 ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for ts2 ) with ( connector = kafka )

//create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1

// query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 11:13:34 1171 0
1 条回答
写回答
取消 提交回答
  • 从报错来看,这个 SQL 应该是 match 了 StreamExecJoinRule,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->TimeIndicatorRelDataType,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。*来自志愿者整理的flink邮件归档

    2021-12-07 11:25:41
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Lazy Join Optimizations Without Upfront Statistics 立即下载
Lazy-Join Optimizations withou 立即下载
低代码开发师(初级)实战教程 立即下载