Flink源码分析:WindowOperator底层实现

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。apply方法接着上一篇从apply方法入手,先来看一下apply的代码逻辑。

上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。


apply方法


接着上一篇从apply方法入手,先来看一下apply的代码逻辑。


private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
        // 生成operator的名字
    final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
    // 获取key的选择器
    KeySelector<T, K> keySel = input.getKeySelector();
    WindowOperator<K, T, Iterable<T>, R, W> operator;
    if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
          (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =
          new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      operator =
        new EvictingWindowOperator<>(windowAssigner,
          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
          keySel,
          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
          stateDesc,
          function,
          trigger,
          evictor,
          allowedLateness,
          lateDataOutputTag);
    } else {
      // 定义ListState的状态描述
      ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
        input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      // 构造windowOperator
      operator =
        new WindowOperator<>(windowAssigner,
          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
          keySel,
          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
          stateDesc,
          function,
          trigger,
          allowedLateness,
          lateDataOutputTag);
    }
    return input.transform(opName, resultType, operator);
  }


首先是生成operator的name,获取key的选择器,然后主要就是判断evictor是否为空,走不同的构造WindowOperator的逻辑,如果evictor不为空就构造EvictingWindowOperator对象,否则就构造WindowOperator对象,其实EvictingWindowOperator是WindowOperator的一个子类,只是多了一个删除数据的逻辑。我们下面以WindowOperator对象为主来进行分析。


先来看一下WindowOperator对象的继承关系图如下:




简单说一下,WindowOperator继承了 AbstractUdfStreamOperator (这个前面也说过了,所以的operator都会继承自它),然后WindowOperator????实现了OneInputStreamOperator接口(这个前面也说过了),AbstractUdfStreamOperator又继承了AbstractStreamOperator这个对象,OneInputStreamOperator接口????继承了StreamOperator这个接口,AbstractStreamOperator对象也实现了StreamOperator接口。下面会在具体的分析。


WindowOperator构造方法


/**
   * Creates a new {@code WindowOperator} based on the given policies and user functions.
   */
  public WindowOperator(
      WindowAssigner<? super IN, W> windowAssigner,
      TypeSerializer<W> windowSerializer,
      KeySelector<IN, K> keySelector,
      TypeSerializer<K> keySerializer,
      StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
      InternalWindowFunction<ACC, OUT, K, W> windowFunction,
      Trigger<? super IN, ? super W> trigger,
      long allowedLateness,
      OutputTag<IN> lateDataOutputTag) {
    super(windowFunction);
    checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner),
      "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " +
        "This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " +
        "the AggregatingProcessingTimeWindowOperator");
    checkArgument(allowedLateness >= 0);
    checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
        "window state serializer is not properly initialized");
    this.windowAssigner = checkNotNull(windowAssigner);
    this.windowSerializer = checkNotNull(windowSerializer);
    this.keySelector = checkNotNull(keySelector);
    this.keySerializer = checkNotNull(keySerializer);
    this.windowStateDescriptor = windowStateDescriptor;
    this.trigger = checkNotNull(trigger);
    this.allowedLateness = allowedLateness;
    this.lateDataOutputTag = lateDataOutputTag;
    setChainingStrategy(ChainingStrategy.ALWAYS);
  }


首先会根据给定的策略和自定义的方法构造WindowOperator对象,构造方法里面主要就是检查一系列的参数是否为空,然后初始化这些变量。


WindowOperator包含如下几个重要方法:


open:operator初始化的逻辑


processElement:新元素进入window的时候调用


onEventTime:event time计算触发时候的逻辑


onProcessingTime:processing time计算触发时候的逻辑


open方法:


先来看一下open方法里面都做了哪些初始化操作


@Override
  public void open() throws Exception {
    super.open();
    this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    timestampedCollector = new TimestampedCollector<>(output);
    internalTimerService =
        getInternalTimerService("window-timers", windowSerializer, this);
    triggerContext = new Context(null, null);
    processContext = new WindowContext(null);
    windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
      @Override
      public long getCurrentProcessingTime() {
        return internalTimerService.currentProcessingTime();
      }
    };
    // create (or restore) the state that hold the actual window contents
    // NOTE - the state may be null in the case of the overriding evicting window operator
    // 在这里它是不为空的就是刚才WindowedStream里面创建的
    if (windowStateDescriptor != null) {
      windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
    }
    // create the typed and helper states for merging windows
    // 这个是session window才会有的合并窗口用的
    if (windowAssigner instanceof MergingWindowAssigner) {
      // store a typed reference for the state of merging windows - sanity check
      if (windowState instanceof InternalMergingState) {
        windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
      }
      // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
      // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
      // TODO activate the sanity check once resolved
//      else if (windowState != null) {
//        throw new IllegalStateException(
//            "The window uses a merging assigner, but the window state is not mergeable.");
//      }
      @SuppressWarnings("unchecked")
      final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;
      final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
          typedTuple,
          new TypeSerializer[] {windowSerializer, windowSerializer});
      final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
          new ListStateDescriptor<>("merging-window-set", tupleSerializer);
      // get the state that stores the merging sets
      mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
          getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
      mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
    }
  }


open方法里面主要就是初始化窗口的状态,如果是session window的话,会多初始化一个关于合并窗口的状态。


processElement方法

当数据达到window的时候,会调用windowoperator的processElement方法


@Override
  public void processElement(StreamRecord<IN> element) throws Exception {
    // windowAssigner先把数据分配到不同的窗口中
    final Collection<W> elementWindows = windowAssigner.assignWindows(
      element.getValue(), element.getTimestamp(), windowAssignerContext);
    //if element is handled by none of assigned elementWindows
    // 如果元素不是由指定的元素窗口处理的 (标记数据是否还需要处理 如果元素被处理过 返回fasle)
    boolean isSkippedElement = true;
    // 获取当前的key
    final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    // 判断是否是session window
    if (windowAssigner instanceof MergingWindowAssigner) {
      MergingWindowSet<W> mergingWindows = getMergingWindowSet();
      for (W window: elementWindows) {
        // adding the new window might result in a merge, in that case the actualWindow
        // is the merged window and we work with that. If we don't merge then
        // actualWindow == window
        W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
          @Override
          public void merge(W mergeResult,
              Collection<W> mergedWindows, W stateWindowResult,
              Collection<W> mergedStateWindows) throws Exception {
            if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
              throw new UnsupportedOperationException("The end timestamp of an " +
                  "event-time window cannot become earlier than the current watermark " +
                  "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                  " window: " + mergeResult);
            } else if (!windowAssigner.isEventTime()) {
              long currentProcessingTime = internalTimerService.currentProcessingTime();
              if (mergeResult.maxTimestamp() <= currentProcessingTime) {
                throw new UnsupportedOperationException("The end timestamp of a " +
                  "processing-time window cannot become earlier than the current processing time " +
                  "by merging. Current processing time: " + currentProcessingTime +
                  " window: " + mergeResult);
              }
            }
            triggerContext.key = key;
            triggerContext.window = mergeResult;
            triggerContext.onMerge(mergedWindows);
            for (W m: mergedWindows) {
              triggerContext.window = m;
              triggerContext.clear();
              deleteCleanupTimer(m);
            }
            // merge the merged state windows into the newly resulting state window
            windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
          }
        });
        // drop if the window is already late
        if (isWindowLate(actualWindow)) {
          mergingWindows.retireWindow(actualWindow);
          continue;
        }
        isSkippedElement = false;
        W stateWindow = mergingWindows.getStateWindow(actualWindow);
        if (stateWindow == null) {
          throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
        }
        windowState.setCurrentNamespace(stateWindow);
        windowState.add(element.getValue());
        triggerContext.key = key;
        triggerContext.window = actualWindow;
        TriggerResult triggerResult = triggerContext.onElement(element);
        // 窗口触发的时候
        if (triggerResult.isFire()) {
          // 从状态里面把数据拿出来
          ACC contents = windowState.get();
          if (contents == null) {
            continue;
          }
          emitWindowContents(actualWindow, contents);
        }
        if (triggerResult.isPurge()) {
          windowState.clear();
        }
        registerCleanupTimer(actualWindow);
      }
      // need to make sure to update the merging state in state
      mergingWindows.persist();
    } else {
      // 循环处理每一个窗口
      for (W window: elementWindows) {
        // drop if the window is already late
        // 如果窗口已经晚了就删除 如果水印在结束时间戳加上允许的延迟之后
        // 如果watermark超过了window_end_time + allowlate_time 就不需要处理了
        if (isWindowLate(window)) {
          continue;
        }
        // 标记为fasle
        isSkippedElement = false;
        // 设置窗口状态的namespace
        windowState.setCurrentNamespace(window);
        // 把数据先保存在windowState里面
        windowState.add(element.getValue());
        triggerContext.key = key;
        triggerContext.window = window;
        TriggerResult triggerResult = triggerContext.onElement(element);
        // 判断窗口是否触发
        if (triggerResult.isFire()) {
          ACC contents = windowState.get();
          if (contents == null) {
            continue;
          }
          // 发送数据到我们定义的function里面 触发窗口的计算逻辑
          emitWindowContents(window, contents);
        }
        // 如果是purge就清除窗口状态的数据
        if (triggerResult.isPurge()) {
          windowState.clear();
        }
        // 注册一个timer去删除窗口里面的数据
        registerCleanupTimer(window);
      }
    }
    // side output input event if
    // element not handled by any window
    // late arriving tag has been set
    // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
    // 设置了晚到时间 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据
    if (isSkippedElement && isElementLate(element)) {
      if (lateDataOutputTag != null){
        sideOutput(element);
      } else {
        // 迟到被丢弃的数据 + 1
        this.numLateRecordsDropped.inc();
      }
    }
  }


先是windowAssigner 把数据分配到不同的窗口中,然后获取当前的key,这个key就是keyby里面的那个key。然后又判断是否是session window 分别走两个不同的处理逻辑,因为session window和其他的window 的逻辑是不一样的,这里我们主要是分析不是session window的情况,也就是上面else里面的逻辑,循环处理每一个window,如果是迟到的窗口会直接忽略,设置当前窗口的namespace,把数据先保存到windowstate里面,判断窗口是否触发,如果触发就发送数据到我们定义的function里面 触发窗口的计算逻辑 ,如果触发了purge操作,则清空window中的内容 最后注册一个timer去删除窗口里面的数据 循环处理完后 判断数据是否晚到 并且在晚到时间内数据达到 如果定义了测流输出就把数据用测流输出 否则就删除晚到的数据 大体的逻辑就是这样,session window的逻辑和这个差不多,但是会有合并状态的过程,这里就不在分析了,有兴趣的可以自己看一下。


onEventTime方法

基于eventtime触发计算的时候会调用这个方法。


@Override
  public void onEventTime(InternalTimer<K, W> timer) throws Exception {
    // 获取key和window
    triggerContext.key = timer.getKey();
    triggerContext.window = timer.getNamespace();
    MergingWindowSet<W> mergingWindows;
    // 如果是session window的话
    if (windowAssigner instanceof MergingWindowAssigner) {
      mergingWindows = getMergingWindowSet();
      // 获取给定的状态窗口 状态窗口是我们保留窗口的实际状态
      W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
      if (stateWindow == null) {
        // Timer firing for non-existent window, this can only happen if a
        // trigger did not clean up timers. We have already cleared the merging
        // window and therefore the Trigger state, however, so nothing to do.
        return;
      } else {
        // 设置当前窗口的命名空间
        windowState.setCurrentNamespace(stateWindow);
      }
    } else {
      // 设置当前窗口的命名空间
      windowState.setCurrentNamespace(triggerContext.window);
      mergingWindows = null;
    }
    TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
    // 判断是否需要触发计算
    if (triggerResult.isFire()) {
      // 获取窗口的数据 开始计算
      ACC contents = windowState.get();
      if (contents != null) {
        emitWindowContents(triggerContext.window, contents);
      }
    }
    // 如果是purge trigger 则删除窗口状态的数据
    if (triggerResult.isPurge()) {
      windowState.clear();
    }
    // 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
    if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
      clearAllState(triggerContext.window, windowState, mergingWindows);
    }
    // 持久化合并窗口的状态
    if (mergingWindows != null) {
      // need to make sure to update the merging state in state
      mergingWindows.persist();
    }
  }


基于processingtime的计算这里就不在分析了,跟上面的eventtime逻辑完全一样,只是TriggerResult 调用的时候时间不一样。


WindowOperator的源码还是比较多的,里面还有很多细节的地方,上面只是分析了主要的逻辑实现,细节方面还需要我们仔细去分析。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
Java 流计算
Flink中异步AsyncIO的实现 (源码分析)
Flink中异步AsyncIO的实现 (源码分析)
|
10月前
|
SQL API 流计算
Flink SQL代码补全提示(源码分析)
Flink SQL代码补全提示(源码分析)
60 0
|
SQL 存储 缓存
Flink进行Paimon写入源码分析
本文主要解析了Flink写入Paimon的核心流程。
|
存储 消息中间件 缓存
Flink进行Hudi写入源码分析
本文主要解析了Flink将DataStream写入到Hudi表的核心流程
|
SQL API Apache
Flink SQL代码补全提示(源码分析)
使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL
411 0
Flink SQL代码补全提示(源码分析)
|
SQL Java API
Flink 1.13.0 sql-client 新特性及源码分析
在 Flink 1.13.0 版本中增加了很多新特征,具体可以参考前面一篇文章,其中很重要的一点是对 sql-client 功能做了加强,支持了初始化脚本和执行 SQL 文件,SQL 客户端是直接运行和部署 SQL 流和批处理作业的便捷方法,而无需从命令行或作为 CI 的一部分来编写任何代码,这个版本大大改进了 SQL 客户端的功能。现在,SQL 客户端和SQL 脚本都支持 Java 应用程序可用的几乎所有操作(通过编程方式从 TableEnvironment 启动查询时)。这意味着 SQL 用户在其 SQL 部署中需要粘贴的代码变的更少.由于篇幅的原因这篇文章只会介绍 SQL CLIENT
Flink 1.13.0 sql-client 新特性及源码分析
|
Apache 调度 流计算
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
230 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
|
分布式计算 数据处理 API
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
231 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
|
SQL 缓存 JSON
Java SPI 机制在 Flink 中的应用(源码分析)
我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程. 什么是 SPI ?
Java SPI 机制在 Flink 中的应用(源码分析)
|
流计算 Windows
Flink源码分析: 窗口机制的执行流程
这篇文章主要是研究一下Flink的window执行流程,但是不会详细的分析代码实现的细节,因为这部分的代码还是非常多的,先了解一下代码执行的整个流程,为后面分析WindowOperator的源码实现逻辑做一个铺垫. 关于Flink的window使用相信大家都比较熟悉了,日常开发中很多场景都会用到window,可以说window是Flink流计算的核心功能之一,我们先来看下官网对于window的使用流程介绍.(这里以keyed Windows为例).