开发者社区> 问答> 正文

试group by A,报错说ROW类型无法group by。这种应该如何处理哈?

请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?

使用flink 1.10.1 blink planner

org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.sc请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row<c1 varchar, c2 varchar>。 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?

使用flink 1.10.1 blink planner

org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42) at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)

*来自志愿者整理的flink邮件归档ala:153) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42) at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65) at com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)

*来自志愿者整理的flink邮件归档-

展开
收起
毛毛虫雨 2021-12-06 16:22:18 782 0
1 条回答
写回答
取消 提交回答
  • 异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys ,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。

    你可以试下下面的query,query keys 对应es中的 id 就是 commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 Select ROW(commentId, commentContent) from T group by commentId, commentContent

    *来自志愿者整理的flink邮件归档

    2021-12-06 16:56:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
对 2000 多亿条数据做一次 group by 需要多久? 立即下载
对2000多亿条数据做一次Group By 需要多久 立即下载
从理论到实践,深度解析MySQL Group Replication 立即下载

相关实验场景

更多