Blink计算引擎 函数无法使用-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Blink计算引擎 函数无法使用

尖山神仙 2020-06-04 10:24:03 224

flink中,在使用blinkplanner后,还是报这个错误,请问是不是使用方法有问题,使用的代码如下: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
         properties.put("group.id", "test_status_compute");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("offset.reset", "earlist");

    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer<>("test_status", new SimpleStringSchema(), properties));
    stream.print("==1");
    DataStream<Row> row = stream.map(new MapFunction<String, Row>() {
        @Override
        public Row map(String value) throws Exception {
            String[] arrs = value.split("\\|");
            Row row = new Row(3);
            for (int i=0; i<arrs.length; i++) {
                row.setField(i+1, arrs[i]);
            }

// row.setField(i, System.currentTimeMillis()); return row; } }).returns(new RowTypeInfo(Types.STRING, Types.INT, Types.INT)); row.print("row"); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableConfig config = new TableConfig();

    config.setPlannerConfig(new PlannerConfig() {
    });
    StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings );
    tableEnvironment.registerDataStream("status", row, "a,b,c,proctime.proctime");


    tableEnvironment.sqlUpdate("select a first_value(b) from status");

异常信息: Exception in thread "main" org.apache.flink.table.api.ValidationException: Undefined function: FIRST_VALUE at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:47) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:47) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66) at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:62) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:63) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:56) at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:135) at org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124) at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:117) at com.study.flink.state.Main.main(Main.java:79)

blink使用计算 blink计算 算法函数 使用算法计算 linq函数
分享到
取消 提交回答
全部回答(1)
  • 尖山神仙
    2020-06-04 17:12:19

    增加了 org.apache.flink flink-table-runtime-blink_2.11 ${flink.version} 这个表包貌似就可以了

    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

相似问题
最新问题