Apache Flink源码解析之stream-window

简介: window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下。

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下。

Window

一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。

Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:

    public abstract long maxTimestamp();

用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现Window时,子类应该override equalshashCode这两个方法,以使得在逻辑上两个相等的window被认为是同一个。

GlobalWindow

GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE

该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer

TimeWindow

TimeWindow表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:

  • start : 时间间隔的起始
  • end : 时间间隔的截止

TimeWindow表示的时间间隔为[start, end)。其maxTimestamp的实现为:

    public long maxTimestamp() {
        return end - 1;
    }

其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较start,end这两个属性。

TimeWindow也在内部实现了序列化器,该序列化器主要针对startend两个属性。

WindowAssigner

元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:

  • assignWindows :将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
  • getDefaultTrigger :返回跟WindowAssigner关联的默认触发器
  • getWindowSerializer :返回WindowAssigner分配的窗口的序列化器

内置的WindowAssigner

整个类型继承图如下:

flink-stream-window_window-assigner-all-class-diagram

下面会谈到很多基于时间的窗口,这里有两个概念,分别是时间类型窗口类型

时间类型:

  • eventTime :用户赋予的自定义的时间戳(事件时间戳)
  • processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)

窗口类型:

  • Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
  • Tumbling:非重叠窗口(在assignWindows方法中返回的一般都是Collections.singletonList()

GlobalWindows

该分配器对应于窗口GlobalWindow,它将所有的元素分配给同一个GlobalWindow(本质上而言,GlobalWindow也只有一个实例)。跟GlobalWindow的实现方式一样,GlobalWindows也被实现为单例模式。

方法实现:

  • assignWindows :方法的实现即返回存放GlobalWindow单实例的集合对象
  • getDefaultTrigger :的实现是返回一个不做任何动作的NerverTrigger

TumblingEventTimeWindows

依据给定的窗口大小,结合event-time,返回存储TimeWindow单实例的集合。getDefaultTrigger方法返回EventTimeTrigger类型的实例。

TumblingProcessingTimeWindows

依据给定窗口的大小,结合processing-time,返回存储TimeWindow单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。getDefaultTrigger方法返回的是ProcessingTimeTrigger类型的实例。

SlidingProcessingTimeWindows

Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动值,即slide。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。

在Sliding窗口中,assignWindows方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:size/slide,然后循环初始化给定的size内不同slide的窗口对象。

SlidingEventTimeWindows

类似SlidingProcessingTimeWindows只不过窗口的start参数的计算方式依赖于系统时间戳。

Evictor

evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素

它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前

该接口只定义了一个方法:

    int evict(Iterable<StreamRecord<T>> elements, int size, W window);

接口的返回值即表示要剔除元素的个数。

内置的Evitor

Flink内置实现了三个Evitor

  • TimeEvitor
  • CountEvitor
  • DeltaEvitor

TimeEvitor

这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:

    public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
        int toEvict = 0;
        long currentTime = Iterables.getLast(elements).getTimestamp();
        long evictCutoff = currentTime - windowSize;
        for (StreamRecord<Object> record: elements) {
            if (record.getTimestamp() > evictCutoff) {
                break;
            }
            toEvict++;
        }
        return toEvict;
    }

大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。

然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。

CountEvictor

基于容量的Evictor,它通过比对evict方法的第二个参数size来判断应该剔除多少个元素。具体的实现:

    public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
        if (size > maxCount) {
            return (int) (size - maxCount);
        } else {
            return 0;
        }
    }

DeltaEvictor

基于给定的阈值thresholddeltaFunction来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。

Time

Flink中仅有一个类Time来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个Time对象,需要两个参数:

  • size : 时间间隔的大小(数值)
  • unit : TimeUnit的实例,表示时间间隔的单位

该类提供的很多静态方法提供对不同unit的设置。

Trigger

Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。

以粗粒度来看,Flink主要提供了三种形式的触发方式:

  • 按元素
  • 按系统时间
  • 按事件时间

这体现为Trigger的三个主要的抽象方法:

  • onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的CountTrigger
  • onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
  • onEventTime :被event-time(事件时间戳)定时器触发

以上这些方法中都有一个共同的参数:TriggerContext

TriggerContext

顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger的内部接口:

  • getCurrentWatermark :返回当前的watermark
  • registerProcessingTimeTimer :注册一个系统时间的定时器,触发onProcessingTime
  • registerEventTimeTimer :注册一个事件时间的定时器,触发onEventTime
  • deleteProcessingTimeTimer :删除系统时间的定时器
  • deleteEventTimeTimer :删除事件时间的定时器
  • getPartitionedState :用于失败恢复的获取状态的接口

其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法getKeyValueState也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。

TrigerResult

Trigger中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过TriggerResult来表达的。它是一个枚举类型,有这么几个枚举值:

  • FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
  • PURGE :清除window中的元素
  • FIRE_AND_PURGE :同时具备FIREPURGE两种属性产生的行为
  • CONTINUE :不做任何操作

内置的Trigger

Flink内置实现了很多触发器,完整的类图如下:

flink-stream-window_trigger-all-class-diagram

这些触发器都具有一些共性,这里一并说明:

  • 由于Flink在Trigger中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回TriggerResult.CONTINUE其实个人认为这种设计方式有欠妥当,因为不利于扩展
  • 因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过TriggerContextgetPartitionedState方法进行存取

EventTimeTrigger

基于事件时间的触发器,对应onEventTime

ProcessingTimeTrigger

基于当前系统时间的触发器,对应onProcessingTime

ContinuousEventTimeTrigger

该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first状态标识为false。具体实现如下:

    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);

        if (first.value()) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerEventTimeTimer(nextFireTimestamp);

            first.update(false);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

持续的触发依赖于在onEventTime中不断注册下一次触发的定时器:

    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        ctx.registerEventTimeTimer(time + interval);
        return TriggerResult.FIRE;
    }

ContinuousProcessingTimeTrigger

基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp来判断是否需要触发,不过它的循环注册过程是在onElement中。

CountTrigger

基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement中,逻辑很简单,先累加如果大于给定的阈值则触发:

    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
        ValueState<Long> count = ctx.getPartitionedState(stateDesc);
        long currentCount = count.value() + 1;
        count.update(currentCount);
        if (currentCount >= maxCount) {
            count.update(0L);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

PurgingTrigger

该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult的实例,进行相应的判断,最后返回FIRE_AND_PURGE枚举值。

DeltaTrigger

基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在onElement中。

小结

本篇还是侧重于分析跟窗口有关的概念,就目前来看它们并没有太多的关联性,这一点我们在后续会剖析它们如何关联起来实现完整的窗口机制的。



原文发布时间为:2016-05-10


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
9月前
|
存储 SQL 缓存
Apache Doris & SelectDB 技术能力全面解析
本文将对 Doris & SelectDB 适合的分析场景和技术能力进行概述解析
1516 1
Apache Doris & SelectDB 技术能力全面解析
|
10月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1034 29
|
10月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
446 4
|
10月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
10月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
10月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
3月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
676 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
420 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
5月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
725 9
Apache Flink:从实时数据分析到实时AI
|
5月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
658 0

热门文章

最新文章

推荐镜像

更多
  • DNS