"我有一个利用Windows的flink流媒体工作。
我的目标是id在一分钟内按内部收到的记录进行分组,然后仅按最新记录流式传输记录id。
我想出了两种可能的方法:
运用 reduce()
stream.keyBy(Record::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.reduce((rec1, rec2) -> rec2);
这工作得很好,但似乎浪费,因为它要求每个和每一个记录。
运用 process()
stream.keyBy(Record::getId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ProcessWindowFunction<Record, Object, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Record> iterable, Collector<Object> collector) throws Exception {
Record last = null;
for (Record rec : iterable) {
if (last == null || last.getTimestamp() < rec.getTimestamp()) {
last = rec;
}
}
collector.collect(last);
}
});
这也可以正常工作。我曾预料到它会更快,但事实并非如此(它与解决方案1大致相同)。
你能推荐一个更好的方法吗?"
"你的解决方案1.似乎是最好的方法。
关于你评论:
这工作正常,但似乎浪费,因为它被称为每个记录。
问题是你不知道哪个记录是最后一个。因此,您始终需要存储上次查看的记录。由于a的结果ReduceFunction存储在状态中(无论是对于方法的下一次评估还是作为结果返回),这正是这里发生的事情。
你的解决方案2.实际上效率较低(从存储/内存的角度来看)。它会记住在一分钟内到达的所有记录,并在评估窗口时迭代所有记录。相反,解决方案1.仅存储单个值(最后一次功能评估的结果)。
你可以使用常规ProcessFunction和计时器实现解决方案,但是,我认为这不会比窗口+快得多ReduceFunction。而且,它需要更多的代码。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。