我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? 我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > > public static void main(String[] args) { > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > > String lTable = "CREATE TABLE l_table ( " + > " l_a INT, " + > " l_b string, " + > " l_rt AS localtimestamp, " + > " WATERMARK FOR l_rt AS l_rt " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.l_a.min'='1', " + > " 'fields.l_a.max'='5', " + > " 'fields.l_b.length'='5' " + > ")"; > bsTableEnv.executeSql(lTable); > > String rTable = "CREATE TABLE r_table ( " + > " r_a INT, " + > " r_b string, " + > " r_pt AS proctime() " + > ") WITH ( " + > " 'connector' = 'datagen', " + > " 'rows-per-second'='5', " + > " 'fields.r_a.min'='1', " + > " 'fields.r_a.max'='5', " + > " 'fields.r_b.length'='5' " + > ")"; > bsTableEnv.executeSql(rTable); > > String printTable = "CREATE TABLE print (" + > " l_a INT, " + > " l_b string, " + > " l_rt timestamp(3), " + > " r_a INT, " + > " r_b string, " + > " r_pt timestamp(3) " + > ") WITH ( " + > " 'connector' = 'print' " + > ") "; > > bsTableEnv.executeSql(printTable); > > // 运行成功 >// Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > > // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. > Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > > bsTableEnv.executeSql("insert into print select * from " + joinTable); > > } > >}
*来自志愿者整理的flink邮件归档
因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。