开发者社区 问答 正文

TumblingProcessingTimeTumblingProcessing

接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中

keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());

ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次

但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作

有什么方式让一个窗口只做一次 aggregate 操作吗? *来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:23:30 788 分享
分享
版权
举报
1 条回答
写回答
取消 提交回答
  • 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。*来自志愿者整理的flink

    2021-12-07 20:56:22 举报
    赞同 评论

    评论

    全部评论 (0)

    登录后可评论
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等