当你在使用 Flink 窗口的时候有没有想过数据是怎么被划分到窗口里面的? 它是根据什么规则划分的? 相信看完这篇文章你就明白了.
@PublicEvolving public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { return new WindowedStream<>(this, assigner); }
当有数据流入到 Window Operator 时需要按照一定规则将数据分配给窗口,WindowAssigner 为数据分配窗口。在新版本里已经把 timeWindow 标记为弃用状态,统一改成了 window 方法,该方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。比如 TumblingEventTimeWindows 就是一个基于 eventtime 时间语义的滚动窗口.如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。
我们先来看一下 WindowAssigner 类的源码如下:
/** * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element. * * <p>In a window operation, elements are grouped by their key (if available) and by the windows to * which it was assigned. The set of elements with the same key and window is called a pane. * When a {@link Trigger} decides that a certain pane should fire the * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied * to produce output elements for that pane. * * @param <T> The type of elements that this WindowAssigner can assign windows to. * @param <W> The type of {@code Window} that this assigner assigns. */ @PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a {@code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {@link WindowAssignerContext} in which the assigner operates. */ public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context); /** * Returns the default trigger associated with this {@code WindowAssigner}. */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a {@link TypeSerializer} for serializing windows that are assigned by * this {@code WindowAssigner}. */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); /** * Returns {@code true} if elements are assigned to windows based on event time, * {@code false} otherwise. */ public abstract boolean isEventTime(); /** * A context provided to the {@link WindowAssigner} that allows it to query the * current processing time. * * <p>This is provided to the assigner by its containing * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, * which, in turn, gets it from the containing * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ public abstract static class WindowAssignerContext { /** * Returns the current processing time. */ public abstract long getCurrentProcessingTime(); } }
这是一个抽象类主要有 4 个方法,简单说一下每个方法的作用:
assignWindows 将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
getDefaultTrigger 返回WindowAssigner默认的 trigger
getWindowSerializer 返回一个类型序列化器用来序列化窗口
isEventTime 是否是 event time
然后再来看一下 WindowAssigner 的实现类 UML 图,如下所示:
image-20210316223805
windowAssigner
这里主要展示了 eventime 语义的, 可以看出 WindowAssigner 有 4 种不同的类型:
Tumbling windows
Sliding windows
Session windows
Global windows
接下来看一下大家用的比较多的 TumblingEventTimeWindows 和 SlidingEventTimeWindows 的源码(processing time 的实现类似) 看下窗口的划分到底是怎么实现的?
TumblingEventTimeWindows 源码
@Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } // Long.MIN_VALUE is currently assigned when no timestamp is present long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
元素的时间戳肯定是大于 Long.MIN_VALUE 的,所以会走到 if 里面 staggerOffset 默认值是空的,所以会先初始化(这个是一个新特性为了解决同一时间触发大量的窗口计算造成的性能问题),然后根据 timestamp 和 size 计算出窗口的开始时间,最后返回一个存储 TimeWindow 的单例集合.
SlidingEventTimeWindows 源码
@Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
滑动窗口跟上面的滚动窗口最大的不同是数据不是分配到一个窗口,而是分配到 size / slide 个不同的窗口里面,返回的是窗口的集合.
/** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
首先会根据元素的 timestamp offset slide 计算出窗口开始的时间戳,然后循环初始化给定的size内不同slide的窗口对象,最后返回一个 List
Session windows 和 Global windows 的实现相对简单这里就不在展开分析了,感兴趣的同学可以自己去看一下.
总结
这篇文章主要解析了 Window Assigner 的实现原理,结合滚动窗口和滑动窗口的源码分析了具体的实现过程.让大家对窗口的划分有更加深入的理解.