List fields = Lists.newArrayList("rawMessage");
Schema schema = new Schema();
for (int i = 0; i < fields.size(); i++) {
schema.field(fields.get(i), Types.STRING()).from(fields.get(i));
}
tableEnvironment
.connect(
new Kafka()
.version("0.8")
.properties(properties)
.topic("raw_playtime_h5_source")
.startFromLatest()
)
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(schema)
.inAppendMode()
.registerTableSource("t1");
Table table1 = tableEnvironment
.sqlQuery("select maps,CARDINALITY(maps) as map_length from t1 ,LATERAL TABLE(split(rawMessage,'\\t')) as T(maps) ");
tableEnvironment.registerTable("t2", table1);
Table table = tableEnvironment.sqlQuery("select maps,map_length from t2 where map_length=87 ");
TypeInformation typeInformation = table.getSchema().toRowType();
String[] columns = table.getSchema().getFieldNames();
DataStream<String> dataStream = tableEnvironment
.toAppendStream(table, typeInformation)
.map(new PhysicTransformMap(columns, 0));
dataStream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
如果把table中的where条件去掉,程序ok,但是只要带上where条件,即不能正常运行
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access.
at org.apache.flink.table.codegen.CodeGenerator
$$ anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator $$
anonfun$15.apply(CodeGenerator.scala:587)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.flink.table.codegen.CodeGenerator
$$ anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator $$
anonfun$16.apply(CodeGenerator.scala:744)
at scala.collection.TraversableLike
$$ anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike $$
anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator
$$ anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator $$
anonfun$16.apply(CodeGenerator.scala:744)
at scala.collection.TraversableLike
$$ anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike $$
anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
at org.apache.flink.table.api.StreamTableEnvironment.explain(StreamTableEnvironment.scala:1002)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。