Flink WindowAssigner 源码解析

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.@PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner);}

当你在使用 Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.


@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}


当有数据流入到 Window Operator 时需要按照一定规则将数据分配给窗口,WindowAssigner 为数据分配窗口。在新版本里已经把 timeWindow 标记为弃用状态,统一改成了 window 方法,该方法接收的输入是一个 WindowAssigner, WindowAssigner  负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。比如 TumblingEventTimeWindows 就是一个基于 eventtime 时间语义的滚动窗口.如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。


我们先来看一下 WindowAssigner 类的源码如下:


/**
 * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane.
 * When a {@link Trigger} decides that a certain pane should fire the
 * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
 * to produce output elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
   private static final long serialVersionUID = 1L;
   /**
    * Returns a {@code Collection} of windows that should be assigned to the element.
    *
    * @param element The element to which windows should be assigned.
    * @param timestamp The timestamp of the element.
    * @param context The {@link WindowAssignerContext} in which the assigner operates.
    */
   public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
   /**
    * Returns the default trigger associated with this {@code WindowAssigner}.
    */
   public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
   /**
    * Returns a {@link TypeSerializer} for serializing windows that are assigned by
    * this {@code WindowAssigner}.
    */
   public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
   /**
    * Returns {@code true} if elements are assigned to windows based on event time,
    * {@code false} otherwise.
    */
   public abstract boolean isEventTime();
   /**
    * A context provided to the {@link WindowAssigner} that allows it to query the
    * current processing time.
    *
    * <p>This is provided to the assigner by its containing
    * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
    * which, in turn, gets it from the containing
    * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
    */
   public abstract static class WindowAssignerContext {
      /**
       * Returns the current processing time.
       */
      public abstract long getCurrentProcessingTime();
   }
}


这是一个抽象类主要有 4 个方法,简单说一下每个方法的作用:


assignWindows 将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合


getDefaultTrigger 返回WindowAssigner默认的 trigger


getWindowSerializer 返回一个类型序列化器用来序列化窗口


isEventTime 是否是 event time


然后再来看一下 WindowAssigner 的实现类 UML 图,如下所示:


image-20210316223805



windowAssigner


这里主要展示了 eventime 语义的, 可以看出 WindowAssigner 有 4 种不同的类型:


Tumbling windows


Sliding windows


Session windows


Global windows


接下来看一下大家用的比较多的 TumblingEventTimeWindows 和 SlidingEventTimeWindows 的源码(processing time 的实现类似) 看下窗口的划分到底是怎么实现的?


TumblingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      if (staggerOffset == null) {
         staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
      }
      // Long.MIN_VALUE is currently assigned when no timestamp is present
      long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
      return Collections.singletonList(new TimeWindow(start, start + size));
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


元素的时间戳肯定是大于 Long.MIN_VALUE 的,所以会走到 if 里面 staggerOffset 默认值是空的,所以会先初始化(这个是一个新特性为了解决同一时间触发大量的窗口计算造成的性能问题),然后根据 timestamp 和 size 计算出窗口的开始时间,最后返回一个存储 TimeWindow 的单例集合.


SlidingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
      long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
      for (long start = lastStart;
         start > timestamp - size;
         start -= slide) {
         windows.add(new TimeWindow(start, start + size));
      }
      return windows;
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


滑动窗口跟上面的滚动窗口最大的不同是数据不是分配到一个窗口,而是分配到 size / slide 个不同的窗口里面,返回的是窗口的集合.


/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows.
  * @return window start
  */
 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  return timestamp - (timestamp - offset + windowSize) % windowSize;
 }


首先会根据元素的 timestamp offset slide 计算出窗口开始的时间戳,然后循环初始化给定的size内不同slide的窗口对象,最后返回一个 List


Session windows 和 Global windows 的实现相对简单这里就不在展开分析了,感兴趣的同学可以自己去看一下.


总结


这篇文章主要解析了 Window Assigner 的实现原理,结合滚动窗口和滑动窗口的源码分析了具体的实现过程.让大家对窗口的划分有更加深入的理解.

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
675 29
|
7月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
192 4
|
7月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
7月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
7月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
7月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
8月前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
1478 0
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
387 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

推荐镜像

更多
  • DNS