开发者社区> 问答> 正文

Blink计算引擎 函数无法使用

flink中,在使用blinkplanner后,还是报这个错误,请问是不是使用方法有问题,使用的代码如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
         properties.put("group.id", "test_status_compute");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("offset.reset", "earlist");

    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer<>("test_status", new SimpleStringSchema(), properties));
    stream.print("==1");
    DataStream<Row> row = stream.map(new MapFunction<String, Row>() {
        @Override
        public Row map(String value) throws Exception {
            String[] arrs = value.split("\\|");
            Row row = new Row(3);
            for (int i=0; i<arrs.length; i++) {
                row.setField(i+1, arrs[i]);
            }

// row.setField(i, System.currentTimeMillis()); return row; } }).returns(new RowTypeInfo(Types.STRING, Types.INT, Types.INT)); row.print("row"); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableConfig config = new TableConfig();

    config.setPlannerConfig(new PlannerConfig() {
    });
    StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings );
    tableEnvironment.registerDataStream("status", row, "a,b,c,proctime.proctime");


    tableEnvironment.sqlUpdate("select a first_value(b) from status");

异常信息: Exception in thread "main" org.apache.flink.table.api.ValidationException: Undefined function: FIRST_VALUE at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:47) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:47) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:62) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:63) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:56) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135) at org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:117) at com.study.flink.state.Main.main(Main.java:79)

展开
收起
尖山神仙 2020-06-04 10:24:03 960 0
1 条回答
写回答
取消 提交回答
  • 增加了 org.apache.flink flink-table-runtime-blink_2.11 ${flink.version} 这个表包貌似就可以了

    2020-06-04 17:12:19
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Blink计算引擎 立即下载
MaxComputeSQL2.0全新的计算引擎 立即下载
实时数仓“王炸组合”实时计算Flink版+Hologres 立即下载