上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。
apply方法
接着上一篇从apply方法入手,先来看一下apply的代码逻辑。
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) { // 生成operator的名字 final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null); // 获取key的选择器 KeySelector<T, K> keySel = input.getKeySelector(); WindowOperator<K, T, Iterable<T>, R, W> operator; if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, evictor, allowedLateness, lateDataOutputTag); } else { // 定义ListState的状态描述 ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", input.getType().createSerializer(getExecutionEnvironment().getConfig())); // 构造windowOperator operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, allowedLateness, lateDataOutputTag); } return input.transform(opName, resultType, operator); }
首先是生成operator的name,获取key的选择器,然后主要就是判断evictor是否为空,走不同的构造WindowOperator的逻辑,如果evictor不为空就构造EvictingWindowOperator对象,否则就构造WindowOperator对象,其实EvictingWindowOperator是WindowOperator的一个子类,只是多了一个删除数据的逻辑。我们下面以WindowOperator对象为主来进行分析。
先来看一下WindowOperator对象的继承关系图如下:
简单说一下,WindowOperator继承了 AbstractUdfStreamOperator (这个前面也说过了,所以的operator都会继承自它),然后WindowOperator????实现了OneInputStreamOperator接口(这个前面也说过了),AbstractUdfStreamOperator又继承了AbstractStreamOperator这个对象,OneInputStreamOperator接口????继承了StreamOperator这个接口,AbstractStreamOperator对象也实现了StreamOperator接口。下面会在具体的分析。
WindowOperator构造方法
/** * Creates a new {@code WindowOperator} based on the given policies and user functions. */ public WindowOperator( WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness, OutputTag<IN> lateDataOutputTag) { super(windowFunction); checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner), "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " + "This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " + "the AggregatingProcessingTimeWindowOperator"); checkArgument(allowedLateness >= 0); checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(), "window state serializer is not properly initialized"); this.windowAssigner = checkNotNull(windowAssigner); this.windowSerializer = checkNotNull(windowSerializer); this.keySelector = checkNotNull(keySelector); this.keySerializer = checkNotNull(keySerializer); this.windowStateDescriptor = windowStateDescriptor; this.trigger = checkNotNull(trigger); this.allowedLateness = allowedLateness; this.lateDataOutputTag = lateDataOutputTag; setChainingStrategy(ChainingStrategy.ALWAYS); }
首先会根据给定的策略和自定义的方法构造WindowOperator对象,构造方法里面主要就是检查一系列的参数是否为空,然后初始化这些变量。
WindowOperator包含如下几个重要方法:
open:operator初始化的逻辑
processElement:新元素进入window的时候调用
onEventTime:event time计算触发时候的逻辑
onProcessingTime:processing time计算触发时候的逻辑
open方法:
先来看一下open方法里面都做了哪些初始化操作
@Override public void open() throws Exception { super.open(); this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME); timestampedCollector = new TimestampedCollector<>(output); internalTimerService = getInternalTimerService("window-timers", windowSerializer, this); triggerContext = new Context(null, null); processContext = new WindowContext(null); windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { return internalTimerService.currentProcessingTime(); } }; // create (or restore) the state that hold the actual window contents // NOTE - the state may be null in the case of the overriding evicting window operator // 在这里它是不为空的就是刚才WindowedStream里面创建的 if (windowStateDescriptor != null) { windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor); } // create the typed and helper states for merging windows // 这个是session window才会有的合并窗口用的 if (windowAssigner instanceof MergingWindowAssigner) { // store a typed reference for the state of merging windows - sanity check if (windowState instanceof InternalMergingState) { windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState; } // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation) // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows() // TODO activate the sanity check once resolved // else if (windowState != null) { // throw new IllegalStateException( // "The window uses a merging assigner, but the window state is not mergeable."); // } @SuppressWarnings("unchecked") final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class; final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>( typedTuple, new TypeSerializer[] {windowSerializer, windowSerializer}); final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); // get the state that stores the merging sets mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>) getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); } }
open方法里面主要就是初始化窗口的状态,如果是session window的话,会多初始化一个关于合并窗口的状态。
processElement方法
当数据达到window的时候,会调用windowoperator的processElement方法
@Override public void processElement(StreamRecord<IN> element) throws Exception { // windowAssigner先把数据分配到不同的窗口中 final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows // 如果元素不是由指定的元素窗口处理的 (标记数据是否还需要处理 如果元素被处理过 返回fasle) boolean isSkippedElement = true; // 获取当前的key final K key = this.<K>getKeyedStateBackend().getCurrentKey(); // 判断是否是session window if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime()) { long currentProcessingTime = internalTimerService.currentProcessingTime(); if (mergeResult.maxTimestamp() <= currentProcessingTime) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + currentProcessingTime + " window: " + mergeResult); } } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); // 窗口触发的时候 if (triggerResult.isFire()) { // 从状态里面把数据拿出来 ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { // 循环处理每一个窗口 for (W window: elementWindows) { // drop if the window is already late // 如果窗口已经晚了就删除 如果水印在结束时间戳加上允许的延迟之后 // 如果watermark超过了window_end_time + allowlate_time 就不需要处理了 if (isWindowLate(window)) { continue; } // 标记为fasle isSkippedElement = false; // 设置窗口状态的namespace windowState.setCurrentNamespace(window); // 把数据先保存在windowState里面 windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); // 判断窗口是否触发 if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } // 发送数据到我们定义的function里面 触发窗口的计算逻辑 emitWindowContents(window, contents); } // 如果是purge就清除窗口状态的数据 if (triggerResult.isPurge()) { windowState.clear(); } // 注册一个timer去删除窗口里面的数据 registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp // 设置了晚到时间 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据 if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { // 迟到被丢弃的数据 + 1 this.numLateRecordsDropped.inc(); } } }
先是windowAssigner 把数据分配到不同的窗口中,然后获取当前的key,这个key就是keyby里面的那个key。然后又判断是否是session window 分别走两个不同的处理逻辑,因为session window和其他的window 的逻辑是不一样的,这里我们主要是分析不是session window的情况,也就是上面else里面的逻辑,循环处理每一个window,如果是迟到的窗口会直接忽略,设置当前窗口的namespace,把数据先保存到windowstate里面,判断窗口是否触发,如果触发就发送数据到我们定义的function里面 触发窗口的计算逻辑 ,如果触发了purge操作,则清空window中的内容 最后注册一个timer去删除窗口里面的数据 循环处理完后 判断数据是否晚到 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据 大体的逻辑就是这样,session window的逻辑和这个差不多,但是会有合并状态的过程,这里就不在分析了,有兴趣的可以自己看一下。
onEventTime方法
基于eventtime触发计算的时候会调用这个方法。
@Override public void onEventTime(InternalTimer<K, W> timer) throws Exception { // 获取key和window triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; // 如果是session window的话 if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); // 获取给定的状态窗口 状态窗口是我们保留窗口的实际状态 W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { // 设置当前窗口的命名空间 windowState.setCurrentNamespace(stateWindow); } } else { // 设置当前窗口的命名空间 windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); // 判断是否需要触发计算 if (triggerResult.isFire()) { // 获取窗口的数据 开始计算 ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } // 如果是purge trigger 则删除窗口状态的数据 if (triggerResult.isPurge()) { windowState.clear(); } // 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态 if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } // 持久化合并窗口的状态 if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }
基于processingtime的计算这里就不在分析了,跟上面的eventtime逻辑完全一样,只是TriggerResult 调用的时候时间不一样。
WindowOperator的源码还是比较多的,里面还有很多细节的地方,上面只是分析了主要的逻辑实现,细节方面还需要我们仔细去分析。