开发者社区> 问答> 正文

FlinkSQL CDC 窗口分组聚合求助

我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: [image: image.png] [image: image.png] 分组计算的SQL如下: [image: image.png] 在执行计算时,报了如下异常: Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, orderInformationId, userId, categoryId, productId, price, productCount, priceSum, shipAddress, receiverAddress]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 那面对我这样的情况,该用什么方案来解决? 望知道的各位告知一下,感谢!*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:45:47 1368 0
1 条回答
写回答
取消 提交回答
  • Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 你可以使用非 window 聚合来代替。

    Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢?

    *来自志愿者整理的flink邮件归档

    2021-12-06 11:24:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载