Flink WindowOperator 源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 0x1 摘要 WindowOperator可以说是Flink窗口功能非常核心核心的类,是窗口功能源码的一条主线,延着这条主线去慢慢看源码会轻松很多。注:此文基于Flink 1.4.2 版本源码。 0x2 WindowOperator 类结构分析 先来看一下类结构图,可以使用idea来生成类图,下图.

0x1 摘要

WindowOperator可以说是Flink窗口功能非常核心核心的类,是窗口功能源码的一条主线,延着这条主线去慢慢看源码会轻松很多。注:此文基于Flink 1.4.2 版本源码。

0x2 WindowOperator 类结构分析

先来看一下类结构图,可以使用idea来生成类图,下图经过稍微加工,去掉一些不重要类的结构图:
ca2a6732_bc3a_444f_b6d0_7aa927d16def
我们核心重点关注以下一个接口:

  • OneInputStreamOperator
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
 /**
  * Processes one element that arrived at this operator.
  * This method is guaranteed to not be called concurrently with other methods of the operator.
  */
 void processElement(StreamRecord<IN> element) throws Exception;

 /**
  * Processes a {@link Watermark}.
  * This method is guaranteed to not be called concurrently with other methods of the operator.
  *
  * @see org.apache.flink.streaming.api.watermark.Watermark
  */
 void processWatermark(Watermark mark) throws Exception;

 void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}

0x3 OneInputStreamOperator 具体实现分析

此接口三个方法WindowOperator类只实现了processElement方法,其余两个方法实现全部在AbstractStreamOperator抽象类中,此文不去讲解,此文重点介绍processElement方法,这个方法也是最重要的方法。

从方法注释可以看出,每一条消息过来都会调用此方法,此方法主体很清晰,看下面条件判断语句:

final Collection<W> elementWindows = windowAssigner.assignWindows(
    element.getValue(), element.getTimestamp(), windowAssignerContext);

//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;

final K key = this.<K>getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
    ...
} else {
    ...
}

// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
    if (lateDataOutputTag != null){
        sideOutput(element);
    } else {
        this.numLateRecordsDropped.inc();
    }
}

分为合并窗口分配器和非合并窗口分配器,我们平时使用的TumblingProcessingTimeWindows都属于非合并窗口,今天就介绍非合并窗口,即代码中else逻辑。
原代码如下:

for (W window: elementWindows) {

    // drop if the window is already late
    if (isWindowLate(window)) {
        continue;
    }
    isSkippedElement = false;

    windowState.setCurrentNamespace(window);
    windowState.add(element.getValue());

    triggerContext.key = key;
    triggerContext.window = window;

    TriggerResult triggerResult = triggerContext.onElement(element);

    if (triggerResult.isFire()) {
        ACC contents = windowState.get();
        if (contents == null) {
            continue;
        }
        emitWindowContents(window, contents);
    }

    if (triggerResult.isPurge()) {
        windowState.clear();
    }
    registerCleanupTimer(window);
}

第一步:判断窗口是否延迟,如果延迟直接踩过,判断延迟的逻辑相对简单可自行查看源码
第二步:设置isSkippedElement标志位,此标志位等于false说明,当前元素可以匹配到窗口,true说明匹配不到窗口,后面会有处理逻辑
第三步:下面四行代码是一些状态设置
第四步:根据当前元素返回一个触发器结果
第五步:判断触发器结果是否需要执行,如果需要执行,则调用emitWindowContents方法执行
第六步:判断是否需要清理窗口状态信息
第七步:注册清除定时器

protected void registerCleanupTimer(W window) {
    long cleanupTime = cleanupTime(window);
    if (cleanupTime == Long.MAX_VALUE) {
        // don't set a GC timer for "end of time"
        return;
    }

    if (windowAssigner.isEventTime()) {
        triggerContext.registerEventTimeTimer(cleanupTime);
    } else {
        triggerContext.registerProcessingTimeTimer(cleanupTime);
    }
}

首先计算清除时间:

private long cleanupTime(W window) {
    if (windowAssigner.isEventTime()) {
        long cleanupTime = window.maxTimestamp() + allowedLateness;
        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    } else {
        return window.maxTimestamp();
    }
}

如果是事件时间则需要算上允许延迟时间,调用triggerContext注册Time

注:processElement方法开头代码

final Collection<W> elementWindows = windowAssigner.assignWindows(
    element.getValue(), element.getTimestamp(), windowAssignerContext);

这段代码是窗口的分配,后面单独文章来分析窗口分配实现原理。

0x4 结束语

整个WindowOperator核心流程代码不多,但代码量还是比较大,里面涉及到窗口分配、时间触发器,每个点都涉及比较多的源码,不能一次性去讲完,需要慢慢去挖。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5月前
|
SQL API 流计算
Flink SQL代码补全提示(源码分析)
Flink SQL代码补全提示(源码分析)
45 0
|
8月前
|
SQL 存储 缓存
Flink进行Paimon写入源码分析
本文主要解析了Flink写入Paimon的核心流程。
|
8月前
|
存储 消息中间件 缓存
Flink进行Hudi写入源码分析
本文主要解析了Flink将DataStream写入到Hudi表的核心流程
|
SQL API Apache
Flink SQL代码补全提示(源码分析)
使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL
377 0
Flink SQL代码补全提示(源码分析)
|
Apache 调度 流计算
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
206 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
|
BI Apache 流计算
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析2
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析2
146 0
|
分布式计算 数据处理 API
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
195 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
|
SQL Java API
Flink 1.13.0 sql-client 新特性及源码分析
在 Flink 1.13.0 版本中增加了很多新特征,具体可以参考前面一篇文章,其中很重要的一点是对 sql-client 功能做了加强,支持了初始化脚本和执行 SQL 文件,SQL 客户端是直接运行和部署 SQL 流和批处理作业的便捷方法,而无需从命令行或作为 CI 的一部分来编写任何代码,这个版本大大改进了 SQL 客户端的功能。现在,SQL 客户端和SQL 脚本都支持 Java 应用程序可用的几乎所有操作(通过编程方式从 TableEnvironment 启动查询时)。这意味着 SQL 用户在其 SQL 部署中需要粘贴的代码变的更少.由于篇幅的原因这篇文章只会介绍 SQL CLIENT
Flink 1.13.0 sql-client 新特性及源码分析
|
SQL 缓存 JSON
Java SPI 机制在 Flink 中的应用(源码分析)
我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程. 什么是 SPI ?
Java SPI 机制在 Flink 中的应用(源码分析)
|
存储 流计算
Flink源码分析:WindowOperator底层实现
上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。 apply方法 接着上一篇从apply方法入手,先来看一下apply的代码逻辑。