source是kafka,有一个rowtime定义:
.field("rowtime", DataTypes.TIMESTAMP(0)) .rowtime(Rowtime() .timestamps_from_field("actionTime") .watermarks_periodic_bounded(60000) )
有两个sink,第一个sink是直接把kafa的数据保存到postgres。 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。 st_env.scan("source")
.window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
.group_by("hourlywindow")
.select("udf(...)") ...
现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。
有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。
*来自志愿者整理的flink邮件归档
watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?
比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。