开发者社区> 问答> 正文

flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问

我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > >    public static void main(String[] args) { >        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > >        String lTable = "CREATE TABLE l_table (  " + >                " l_a INT,  " + >                " l_b string,  " + >                " l_rt AS localtimestamp,  " + >                " WATERMARK FOR l_rt AS l_rt  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.l_a.min'='1',  " + >                " 'fields.l_a.max'='5',  " + >                " 'fields.l_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(lTable); > >        String rTable = "CREATE TABLE r_table (  " + >                " r_a INT,  " + >                " r_b string,  " + >                " r_pt AS proctime()  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.r_a.min'='1',  " + >                " 'fields.r_a.max'='5',  " + >                " 'fields.r_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(rTable); > >        String printTable = "CREATE TABLE print (" + >                "  l_a INT,  " + >                "  l_b string,  " + >                "  l_rt timestamp(3),  " + >                "  r_a INT,  " + >                "  r_b string,  " + >                "  r_pt timestamp(3)  " + >                ") WITH (  " + >                " 'connector' = 'print' " + >                ") "; > >        bsTableEnv.executeSql(printTable); > >        // 运行成功 >//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > >        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. >        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > >        bsTableEnv.executeSql("insert into print select * from " + joinTable); > >    } > >}

*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-06 11:32:10 1445 0
1 条回答
写回答
取消 提交回答
  •    因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。

       而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。

       Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。

    *来自志愿者整理的flink邮件归档

    2021-12-06 14:07:55
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载