开发者社区> 问答> 正文

滑动时间窗口的Flink性能问题

我正在尝试使用一些网络监视器工作。我的目标是不同的计数dst_ip每src_ip。

我的以下代码有效,但性能非常糟糕。似乎每个滑动窗口都会重新计算所有事件,但这不是必需的。

例如,我们有事件按时间秒1 - 600.Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口到期时,flink只合并1-300的累加器,并销毁第二个1的累加器。该窗口也可以在最后一秒之前预合并1-299。当第二个滑动窗口到期时,flink只合并2-301的累加器,并销毁第二个2的累加器。依此类推......

这种方式比将事件分配给多个窗口更有效,并计算每个窗口的聚合。

flink支持吗?我可以用flink自己获得类似的功能吗?

public static class AverageAccumulator2 {

String key;
Set<String> target;
AverageAccumulator2() {
    target = new HashSet<>();
}

}

public static class Average2 implements AggregateFunction>> {

@Override
public AverageAccumulator2 createAccumulator() {
    return new AverageAccumulator2();
}

@Override
public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
    accumulator.key = value.get("value").get("src_ip").asText();
    accumulator.target.add(value.get("value").get("dst_ip").asText());
    return accumulator;
}
@Override
public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
    return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
}

@Override
public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
    a.target.addAll(b.target);
    return a;
}

}

final SingleOutputStreamOperator> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())

                .timeWindow(Time.seconds(300),Time.seconds(1))
                .aggregate(new Average2());

展开
收起
社区小助手 2018-12-11 16:22:51 4152 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    如您所见,Flink不会尝试优化滑动窗口。通过细粒度滑动,这确实变得非常昂贵。

    你可以做的是使用ProcessFunction实现你自己的逻辑来处理状态和定时器- 你可以像你概述的那样实现它。您将拥有一个processElement方法,该方法为每个传入记录更新您将用于累积结果的数据结构,以及每秒触发一次的onTimer方法,将部分结果合并在一起,并将结果发送到下游。

    2019-07-17 23:19:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载