开发者社区> 问答> 正文

tumbling的窗口更新随着job运行时间越长,delay越久,sliding不会,哪个地方错了?

大家好。

我用的tumbling window, ds.keyBy(CandleView::getMarketCode) .timeWindow(Time.minutes(5L))

.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new OhlcAggregateFunction(), new OhlcWindowFunction()) .addSink(new PgSink(jdbcUrl, userName, password, candle_table_5m)) .name(candle_table_5m);

Sliding Window:

ds.keyBy(CandleView::getMarketCode) .timeWindow(Time.hours(24L), Time.seconds(2)) .aggregate(new OhlcAggregateFunction(), new TickerWindowFunction()) .addSink(new PgSink(jdbcUrl, userName, password, candle_table_24h)) .name(candle_table_24h);

一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。 基于的是同一个dataStream

有没有什么建议,或者哪个地方用错了? 谢谢*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-05 21:37:35 500 0
1 条回答
写回答
取消 提交回答
  • Hi 你好 看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend 加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。 至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger 缺延迟了。这两者的行为不一样,其实没有什么可比的。 对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。*来自志愿者整理的flink

    2021-12-05 22:27:55
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载