开发者社区> 问答> 正文

一个source多个sink的同步问题

source是kafka,有一个rowtime定义:

.field("rowtime", DataTypes.TIMESTAMP(0)) .rowtime(Rowtime() .timestamps_from_field("actionTime") .watermarks_periodic_bounded(60000) )

有两个sink,第一个sink是直接把kafa的数据保存到postgres。 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。 st_env.scan("source")
.window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
.group_by("hourlywindow")
.select("udf(...)") ...

现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。

有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 15:25:35 595 0
1 条回答
写回答
取消 提交回答
  • watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?

    比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?

    *来自志愿者整理的flink邮件归档

    2021-12-06 17:19:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPARK STREAMING 立即下载
An Online Spark Pipeline 立即下载
Get rid of traditional ETL, Move to Spark! 立即下载