开发者社区> 问答> 正文

Flink 报错:Window can only be defined over a time...

以下代码在执行到下面这行代码时,报错:Window can only be defined over a time attribute column

DataStream<Tuple2<Boolean, Result3>> resultStream  = tableEnv.toRetractStream(sqlQuery,Result3.class);

主要代码逻辑:

  public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        TableConfig tc = new TableConfig();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv,tc);

        String[] para = {"--bootstrap.servers","localhost:9092",
                "--topic","Topic1022",
                "--group.id","test_consumer_group"};
        args = para;
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Student> map = stream.map(new MapFunction<String, Student>() {
            //private static final long serialVersionUID = 1471936326697828381L;
            public Student map(String s) throws Exception {
                String[] split = s.split(",");
                return new Student(String.valueOf(split[0]),
                        String.valueOf(split[1]),
                        Double.valueOf(split[2]),
                        Timestamp.valueOf(split[3])
                );
            }
        });
        map.print(); //打印流数据

        //注册为user表
        tableEnv.registerDataStream("StudentTable", map, "season, name, score, rowtime");
        Table sqlQuery = tableEnv.sqlQuery("SELECT avg(score) as avg_score " +
                "FROM  StudentTable " +
                "GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)");

        //5\. 结果进行打印
        DataStream<Tuple2<Boolean, Result3>> resultStream  = tableEnv.toRetractStream(sqlQuery,Result3.class);
        resultStream.print();

        env.execute("userPv from Kafka");

    }
}
   

Maven依赖:

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.8.0</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--Flink Table操作相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
       
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--日志打印相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
	at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
	at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:96)
	at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
	at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
	at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
	at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
	at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:813)
	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:862)
	at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:308)
	at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:251)
	at com.cmbchina.cc.main.Test.main(Test.java:60)```


多谢解答!!!

展开
收起
1689192122818141 2019-10-24 21:16:49 7148 0
1 条回答
写回答
取消 提交回答
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    你得处理一个时间属性的列啊。

    2019-11-11 17:09:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载