大家好。
我用的tumbling window, ds.keyBy(CandleView::getMarketCode) .timeWindow(Time.minutes(5L))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(new OhlcAggregateFunction(), new OhlcWindowFunction()) .addSink(new PgSink(jdbcUrl, userName, password, candle_table_5m)) .name(candle_table_5m);
Sliding Window:
ds.keyBy(CandleView::getMarketCode) .timeWindow(Time.hours(24L), Time.seconds(2)) .aggregate(new OhlcAggregateFunction(), new TickerWindowFunction()) .addSink(new PgSink(jdbcUrl, userName, password, candle_table_24h)) .name(candle_table_24h);
一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。 基于的是同一个dataStream
有没有什么建议,或者哪个地方用错了? 谢谢*来自志愿者整理的flink
Hi 你好 看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend 加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。 至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger 缺延迟了。这两者的行为不一样,其实没有什么可比的。 对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。