假设我们有这样的数据结构:
Tuple2, Integer>
第一个字段是ArrayList长度为1的字段,其中包含一个时间戳,而Integer字段是一个1到40之间的数字channel。目标是使用相同的密钥(channel)聚合每400条消息并应用ReduceFunction它们(它只合并元组的第一个字段中的400条消息的时间戳)。我将channel字段设置为消息的键,并创建一个400的计数窗口。例如,如果我们有160000个消息作为输入,它应该输出160000/400 = 400行,并且计数窗口按照需要工作。问题是当我使用Sliding Count窗口时,我的预期行为是:
Flink为每个channel数字创建逻辑窗口,并ReduceFunction 在第一次应用时,如果逻辑窗口的长度达到400,那么每100个输入数据(与逻辑窗口的密钥相同的密钥)将调用ReduceFunction最后400个消息。窗口,所以我们应该:
160000 - 400 = 159600// 第一个400输入将首次调用reduce函数
159600 / 100 = 1596 // 在第一个400输入之后,每100个输入Flink调用最后400个输入的reduce函数
1 + 1596 = 1597 //输出的行数
但是运行Sliding Count窗口,它输出1600行,其长度可变。(我预计输出长度只有400)
要点:说长度我的意思是ArrayList的大小(Tuple2的第一个字段)
前40个频道 - >长度100
第二个40通道 - >长度为299
第三个40通道 - >长度为598
第四个40通道 - >长度为997
遗体40通道 - >长度400
我如何证明这种行为并实现我想要的滑动计数窗口?
这是源代码:
DataStream, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
更新:根据@DavidAnderson的建议,我也尝试在ReduceFunstion而不是修改中创建一个新的元组t0,但它产生了相同的输出。
public Tuple2, Integer> reduce(Tuple2, Integer> t0, Tuple2, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
这是countWindow的实现
public WindowedStream countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
它的表现与你期望的完全不同。每100个元素(幻灯片)触发窗口,无论它是否包含400个元素(大小)。大小控制最多保留多少元素。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。