开发者社区> 问答> 正文

flink sql sink mysql requires primary keys?

Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:113) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:68) at org.rabbit.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 19:58:44 1230 0
1 条回答
写回答
取消 提交回答
  • 在 Flink 1.10 中,sink 的 primary key 是从 query 推导的,如果 query 推导不出 pk 就会报你看到的错误 “UpsertStreamTableSink requires that Table has a full primary keys if it is updated.” 你的这个作业就是 query pk 推导不出来的 case。

    此外 DDL 上声明 PK 在1.10也是不支持的。

    这些问题,在 1.11 都解决了,可以尝试自己拿 release-1.11 分支编译下尝试下。 Flink 1.11 中,sink的 primary key 都是从 DDL 上用户显式声明出来的,不会去推导 query pk。

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:26:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载

相关镜像