我有一个窗口小时聚合的DataStream。
DataStream ds = .....
SingleOutputStreamOperator hourly =
ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
.timeWindow(Time.hours(1))
.aggregate(new HourlyCountersAggregation());
这个DataStream每小时都会接收到Cassandra。
我也想要每天聚合相同的DataStream 。
建议的方法是从每小时聚合的DataStream执行此操作,以避免从DataStream ds 保持大的每日流式传输状态....
请问,目前需求想使用flink进行每周的数据聚合,但目前时间方面只支持到hour级别,请问有自定义实现的接口吗
我想您可能会尝试分配作业图,以便在接收器和下一个窗口中使用每小时聚合数据的输出(这样您就可以在每小时聚合之上进行每日聚合,从而节省计算时间)。就像是:
SingleOutputStreamOperator hourly = ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR)
.timeWindow(Time.hours(1))
.aggregate(new HourlyCountersAggregation());
hourly.addSink(...);
SingleOutputStreamOperator daily = hourly.timeWindow(Time.hours(24))
.trigger(CountEventTrigger.of(24))
.aggregate(...);
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。