flink中,在使用blinkplanner后,还是报这个错误,请问是不是使用方法有问题,使用的代码如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.put("group.id", "test_status_compute");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("offset.reset", "earlist");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("test_status", new SimpleStringSchema(), properties));
stream.print("==1");
DataStream<Row> row = stream.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] arrs = value.split("\\|");
Row row = new Row(3);
for (int i=0; i<arrs.length; i++) {
row.setField(i+1, arrs[i]);
}
// row.setField(i, System.currentTimeMillis()); return row; } }).returns(new RowTypeInfo(Types.STRING, Types.INT, Types.INT)); row.print("row"); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableConfig config = new TableConfig();
config.setPlannerConfig(new PlannerConfig() {
});
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings );
tableEnvironment.registerDataStream("status", row, "a,b,c,proctime.proctime");
tableEnvironment.sqlUpdate("select a first_value(b) from status");
异常信息: Exception in thread "main" org.apache.flink.table.api.ValidationException: Undefined function: FIRST_VALUE at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:47) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:47) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:62) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:63) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:56) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135) at org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:117) at com.study.flink.state.Main.main(Main.java:79)
增加了 org.apache.flink flink-table-runtime-blink_2.11 ${flink.version} 这个表包貌似就可以了
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。