开发者社区> 问答> 正文

blink planner的org.apache.flink.table.api.Validatio

tEnv.connect(new Kafka() .version("universal") .topic("xxx") .startFromLatest() .property("bootstrap.servers", "xxxx") .property("group.id", "xxxx")) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(new Schema() // .field("logger_name", Types.STRING) // .field("host", Types.STRING) // .field("@timestamp", Types.SQL_TIMESTAMP) // .field("_rowtime", Types.SQL_TIMESTAMP) // .rowtime( // new Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000)) .field("doc", Types.POJO(Doc.class)) ) .inAppendMode() .registerTableSource("xxx");

Table result = tEnv.sqlQuery( "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx");

// result.printSchema(); tEnv.toAppendStream(result, new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING, STRING, STRING, STRING, STRING, LONG, STRING, INT, STRING, INT)).print();

以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:

、、、

Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of table field 'doc' does not match with type PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field 'doc' of the TableSource return type. at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at com.sogou.qidian.BatchJob.main(BatchJob.java:83)

Execution failed for task ':BatchJob.main()'.*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 17:52:56 620 0
1 条回答
写回答
取消 提交回答
  • 这是什么版本? Doc类能完整提供下吗?方便我们复现。*来自志愿者整理的FLINK邮件归档

    2021-12-02 18:14:27
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像