以下代码在执行到下面这行代码时,报错:Window can only be defined over a time attribute column
DataStream<Tuple2<Boolean, Result3>> resultStream = tableEnv.toRetractStream(sqlQuery,Result3.class);
主要代码逻辑:
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
TableConfig tc = new TableConfig();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv,tc);
String[] para = {"--bootstrap.servers","localhost:9092",
"--topic","Topic1022",
"--group.id","test_consumer_group"};
args = para;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Student> map = stream.map(new MapFunction<String, Student>() {
//private static final long serialVersionUID = 1471936326697828381L;
public Student map(String s) throws Exception {
String[] split = s.split(",");
return new Student(String.valueOf(split[0]),
String.valueOf(split[1]),
Double.valueOf(split[2]),
Timestamp.valueOf(split[3])
);
}
});
map.print(); //打印流数据
//注册为user表
tableEnv.registerDataStream("StudentTable", map, "season, name, score, rowtime");
Table sqlQuery = tableEnv.sqlQuery("SELECT avg(score) as avg_score " +
"FROM StudentTable " +
"GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)");
//5\. 结果进行打印
DataStream<Tuple2<Boolean, Result3>> resultStream = tableEnv.toRetractStream(sqlQuery,Result3.class);
resultStream.print();
env.execute("userPv from Kafka");
}
}
Maven依赖:
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--Flink Table操作相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志打印相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:96)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:813)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:862)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:251)
at com.cmbchina.cc.main.Test.main(Test.java:60)```
多谢解答!!!
你得处理一个时间属性的列啊。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。