开发者社区> 问答> 正文

【FLINK】在同一个流上执行多次sql,导致第二个sql中的where条件不可用

游客6juj22qmvfpmk 2019-03-08 10:11:23 549

List fields = Lists.newArrayList("rawMessage");

    Schema schema = new Schema();
    for (int i = 0; i < fields.size(); i++) {
        schema.field(fields.get(i), Types.STRING()).from(fields.get(i));
    }

    tableEnvironment
            .connect(
                    new Kafka()
                            .version("0.8")
                            .properties(properties)
                            .topic("raw_playtime_h5_source")
                            .startFromLatest()
            )
            .withFormat(new Json().failOnMissingField(false).deriveSchema())
            .withSchema(schema)
            .inAppendMode()
            .registerTableSource("t1");
    Table table1 = tableEnvironment
            .sqlQuery("select maps,CARDINALITY(maps) as map_length from t1 ,LATERAL TABLE(split(rawMessage,'\\t')) as T(maps) ");
    tableEnvironment.registerTable("t2", table1);
    Table table = tableEnvironment.sqlQuery("select maps,map_length from t2 where map_length=87 ");
    TypeInformation typeInformation = table.getSchema().toRowType();

    String[] columns = table.getSchema().getFieldNames();
    DataStream<String> dataStream = tableEnvironment
            .toAppendStream(table, typeInformation)
            .map(new PhysicTransformMap(columns, 0));

    dataStream.print();
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }

如果把table中的where条件去掉,程序ok,但是只要带上where条件,即不能正常运行
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access.

at org.apache.flink.table.codegen.CodeGenerator

$$ anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator $$

anonfun$15.apply(CodeGenerator.scala:587)

at scala.Option.getOrElse(Option.scala:120)
at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.flink.table.codegen.CodeGenerator

$$ anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator $$

anonfun$16.apply(CodeGenerator.scala:744)

at scala.collection.TraversableLike

$$ anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike $$

anonfun$map$1.apply(TraversableLike.scala:245)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator

$$ anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator $$

anonfun$16.apply(CodeGenerator.scala:744)

at scala.collection.TraversableLike

$$ anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike $$

anonfun$map$1.apply(TraversableLike.scala:245)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
at org.apache.flink.table.api.StreamTableEnvironment.explain(StreamTableEnvironment.scala:1002)
分享到
取消 提交回答
全部回答(0)
阿里云实时计算
使用钉钉扫一扫加入圈子
+ 订阅

一套基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理、DataLake计算等场景。

推荐文章
相似问题
链接