Flink WindowAssigner 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.@PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner);}

当你在使用 Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.


@PublicEvolving
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}


当有数据流入到 Window Operator 时需要按照一定规则将数据分配给窗口,WindowAssigner 为数据分配窗口。在新版本里已经把 timeWindow 标记为弃用状态,统一改成了 window 方法,该方法接收的输入是一个 WindowAssigner, WindowAssigner  负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。比如 TumblingEventTimeWindows 就是一个基于 eventtime 时间语义的滚动窗口.如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。


我们先来看一下 WindowAssigner 类的源码如下:


/**
 * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
 *
 * <p>In a window operation, elements are grouped by their key (if available) and by the windows to
 * which it was assigned. The set of elements with the same key and window is called a pane.
 * When a {@link Trigger} decides that a certain pane should fire the
 * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
 * to produce output elements for that pane.
 *
 * @param <T> The type of elements that this WindowAssigner can assign windows to.
 * @param <W> The type of {@code Window} that this assigner assigns.
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
   private static final long serialVersionUID = 1L;
   /**
    * Returns a {@code Collection} of windows that should be assigned to the element.
    *
    * @param element The element to which windows should be assigned.
    * @param timestamp The timestamp of the element.
    * @param context The {@link WindowAssignerContext} in which the assigner operates.
    */
   public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
   /**
    * Returns the default trigger associated with this {@code WindowAssigner}.
    */
   public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
   /**
    * Returns a {@link TypeSerializer} for serializing windows that are assigned by
    * this {@code WindowAssigner}.
    */
   public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
   /**
    * Returns {@code true} if elements are assigned to windows based on event time,
    * {@code false} otherwise.
    */
   public abstract boolean isEventTime();
   /**
    * A context provided to the {@link WindowAssigner} that allows it to query the
    * current processing time.
    *
    * <p>This is provided to the assigner by its containing
    * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
    * which, in turn, gets it from the containing
    * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
    */
   public abstract static class WindowAssignerContext {
      /**
       * Returns the current processing time.
       */
      public abstract long getCurrentProcessingTime();
   }
}


这是一个抽象类主要有 4 个方法,简单说一下每个方法的作用:


assignWindows 将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合


getDefaultTrigger 返回WindowAssigner默认的 trigger


getWindowSerializer 返回一个类型序列化器用来序列化窗口


isEventTime 是否是 event time


然后再来看一下 WindowAssigner 的实现类 UML 图,如下所示:


image-20210316223805



windowAssigner


这里主要展示了 eventime 语义的, 可以看出 WindowAssigner 有 4 种不同的类型:


Tumbling windows


Sliding windows


Session windows


Global windows


接下来看一下大家用的比较多的 TumblingEventTimeWindows 和 SlidingEventTimeWindows 的源码(processing time 的实现类似) 看下窗口的划分到底是怎么实现的?


TumblingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      if (staggerOffset == null) {
         staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
      }
      // Long.MIN_VALUE is currently assigned when no timestamp is present
      long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
      return Collections.singletonList(new TimeWindow(start, start + size));
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


元素的时间戳肯定是大于 Long.MIN_VALUE 的,所以会走到 if 里面 staggerOffset 默认值是空的,所以会先初始化(这个是一个新特性为了解决同一时间触发大量的窗口计算造成的性能问题),然后根据 timestamp 和 size 计算出窗口的开始时间,最后返回一个存储 TimeWindow 的单例集合.


SlidingEventTimeWindows 源码


@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
      long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
      for (long start = lastStart;
         start > timestamp - size;
         start -= slide) {
         windows.add(new TimeWindow(start, start + size));
      }
      return windows;
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
            "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
            "'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}


滑动窗口跟上面的滚动窗口最大的不同是数据不是分配到一个窗口,而是分配到 size / slide 个不同的窗口里面,返回的是窗口的集合.


/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows.
  * @return window start
  */
 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
  return timestamp - (timestamp - offset + windowSize) % windowSize;
 }


首先会根据元素的 timestamp offset slide 计算出窗口开始的时间戳,然后循环初始化给定的size内不同slide的窗口对象,最后返回一个 List


Session windows 和 Global windows 的实现相对简单这里就不在展开分析了,感兴趣的同学可以自己去看一下.


总结


这篇文章主要解析了 Window Assigner 的实现原理,结合滚动窗口和滑动窗口的源码分析了具体的实现过程.让大家对窗口的划分有更加深入的理解.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
PandasTA 源码解析(二十三)
PandasTA 源码解析(二十三)
40 0
|
1天前
PandasTA 源码解析(二十二)(3)
PandasTA 源码解析(二十二)
34 0
|
1天前
PandasTA 源码解析(二十二)(2)
PandasTA 源码解析(二十二)
39 2
|
1天前
PandasTA 源码解析(二十二)(1)
PandasTA 源码解析(二十二)
31 0
|
1天前
PandasTA 源码解析(二十一)(4)
PandasTA 源码解析(二十一)
22 1
|
1天前
PandasTA 源码解析(二十一)(3)
PandasTA 源码解析(二十一)
17 0
|
1天前
PandasTA 源码解析(二十一)(2)
PandasTA 源码解析(二十一)
25 1
|
Web App开发 监控 API
Flink技术源码解析(一):Flink概述与源码研读准备
一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星。关于Flink与其它主流实时大数据处理引擎Storm、Spark Streaming的不同与优势,可参考https://blog.csdn.net/cm_chenmin/article/details/53072498。 出于技术人对技术本能的好奇与冲动,
31657 0
|
1天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1634 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1639 2
官宣|Apache Flink 1.19 发布公告

推荐镜像

更多