Flink WindowAssigner 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.@PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner);}

当你在使用 Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.


@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}


当有数据流入到 Window Operator 时需要按照一定规则将数据分配给窗口,WindowAssigner 为数据分配窗口。在新版本里已经把 timeWindow 标记为弃用状态,统一改成了 window 方法,该方法接收的输入是一个 WindowAssigner, WindowAssigner  负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。比如 TumblingEventTimeWindows 就是一个基于 eventtime 时间语义的滚动窗口.如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。


我们先来看一下 WindowAssigner 类的源码如下:


/**
 * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane.
 * When a {@link Trigger} decides that a certain pane should fire the
 * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
 * to produce output elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
   private static final long serialVersionUID = 1L;
   /**
    * Returns a {@code Collection} of windows that should be assigned to the element.
    *
    * @param element The element to which windows should be assigned.
    * @param timestamp The timestamp of the element.
    * @param context The {@link WindowAssignerContext} in which the assigner operates.
    */
   public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
   /**
    * Returns the default trigger associated with this {@code WindowAssigner}.
    */
   public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
   /**
    * Returns a {@link TypeSerializer} for serializing windows that are assigned by
    * this {@code WindowAssigner}.
    */
   public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
   /**
    * Returns {@code true} if elements are assigned to windows based on event time,
    * {@code false} otherwise.
    */
   public abstract boolean isEventTime();
   /**
    * A context provided to the {@link WindowAssigner} that allows it to query the
    * current processing time.
    *
    * <p>This is provided to the assigner by its containing
    * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
    * which, in turn, gets it from the containing
    * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
    */
   public abstract static class WindowAssignerContext {
      /**
       * Returns the current processing time.
       */
      public abstract long getCurrentProcessingTime();
   }
}


这是一个抽象类主要有 4 个方法,简单说一下每个方法的作用:


assignWindows 将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合


getDefaultTrigger 返回WindowAssigner默认的 trigger


getWindowSerializer 返回一个类型序列化器用来序列化窗口


isEventTime 是否是 event time


然后再来看一下 WindowAssigner 的实现类 UML 图,如下所示:


image-20210316223805



windowAssigner


这里主要展示了 eventime 语义的, 可以看出 WindowAssigner 有 4 种不同的类型:


Tumbling windows


Sliding windows


Session windows


Global windows


接下来看一下大家用的比较多的 TumblingEventTimeWindows 和 SlidingEventTimeWindows 的源码(processing time 的实现类似) 看下窗口的划分到底是怎么实现的?


TumblingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      if (staggerOffset == null) {
         staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
      }
      // Long.MIN_VALUE is currently assigned when no timestamp is present
      long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
      return Collections.singletonList(new TimeWindow(start, start + size));
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


元素的时间戳肯定是大于 Long.MIN_VALUE 的,所以会走到 if 里面 staggerOffset 默认值是空的,所以会先初始化(这个是一个新特性为了解决同一时间触发大量的窗口计算造成的性能问题),然后根据 timestamp 和 size 计算出窗口的开始时间,最后返回一个存储 TimeWindow 的单例集合.


SlidingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
      long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
      for (long start = lastStart;
         start > timestamp - size;
         start -= slide) {
         windows.add(new TimeWindow(start, start + size));
      }
      return windows;
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


滑动窗口跟上面的滚动窗口最大的不同是数据不是分配到一个窗口,而是分配到 size / slide 个不同的窗口里面,返回的是窗口的集合.


/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows.
  * @return window start
  */
 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  return timestamp - (timestamp - offset + windowSize) % windowSize;
 }


首先会根据元素的 timestamp offset slide 计算出窗口开始的时间戳,然后循环初始化给定的size内不同slide的窗口对象,最后返回一个 List


Session windows 和 Global windows 的实现相对简单这里就不在展开分析了,感兴趣的同学可以自己去看一下.


总结


这篇文章主要解析了 Window Assigner 的实现原理,结合滚动窗口和滑动窗口的源码分析了具体的实现过程.让大家对窗口的划分有更加深入的理解.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
3天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
16天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
36 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
53 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
108 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0

推荐镜像

更多