开发者社区> 问答> 正文

使用Flink 1.10 blink planner写ES的异常问题

Hi, 我使用Flink 1.10,开启了Blink Planner,在尝试写入ES且使用UpsertMode时(sql就是insert into table select xxx group by xxxxx),抛出了如下异常: 我通过DDL尝试定义一个ESTableSink,且声名primary key时,运行时又说Primary key和unique key目前不支持。。那这就是个悖论啊。。真的不科学。

关键问题:我切换回使用老的planner时,是没问题的。。这可能是Blink Planner的bug么?真心请教。

org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:82) at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.doJob(ZhangleClientComputeTask.java:80) at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50) at com.htsc.crm_realtime.fatjob.Jobs.ZhangleClientRealtimeIndicatorJob.ZhangleClientComputeTask.main(ZhangleClientComputeTask.java:27)*来自志愿者整理的flink邮件归档

展开
收起
塔塔塔塔塔塔 2021-12-02 17:55:20 1108 0
1 条回答
写回答
取消 提交回答
  • 这个异常是说通过 query 推断不出 query 的 primary key,不是说 sink 没有 primary key。至于为什么 query 推断不出 pk,可能要结合 query 看一下。 *来自志愿者整理的FLINK邮件归档

    2021-12-02 18:11:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载