开发者社区> 问答> 正文

TumblingProcessingTimeTumblingProcessing

接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());

ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次

但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作

有什么方式让一个窗口只做一次 aggregate 操作吗? *来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:23:30 784 0
1 条回答
写回答
取消 提交回答
  • 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。*来自志愿者整理的flink

    2021-12-07 20:56:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载