Flink流处理之窗口算子分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂。本文将以由面及点的方式来分析WindowOperator的实现。首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图: 上图中,左侧从左往右为事件流的方向。

窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂。本文将以由面及点的方式来分析WindowOperator的实现。首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图:

window-operator-work-detail

上图中,左侧从左往右为事件流的方向。方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水印。元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水印。

如果是普通的事件,则会调用processElement方法(上图虚线框中的三个圆圈中的一个)进行处理,在processElement方法中,首先会利用窗口分配器为当前接收到的元素分配窗口,接着会调用触发器的onElement方法进行逐元素触发。对于时间相关的触发器,通常会注册事件时间或者处理时间定时器,这些定时器会被存储在WindowOperator的处理时间定时器队列和水印定时器队列中(见图中虚线框中上下两个圆柱体),如果触发的结果是FIRE,则对窗口进行计算。

如果是水印(事件时间场景),则方法processWatermark将会被调用,它将会处理水印定时器队列中的定时器。如果时间戳满足条件,则利用触发器的onEventTime方法进行处理。

而对于处理时间的场景,WindowOperator将自身实现为一个基于处理时间的触发器,以触发trigger方法来消费处理时间定时器队列中的定时器满足条件则会调用窗口触发器的onProcessingTime,根据触发结果判断是否对窗口进行计算。

以上是WindowOperator的常规流程最简单的表述,事实上其逻辑要复杂得多。我们首先分解掉几个内部核心对象,上图中我们看到有两个队列:分别是水印定时器队列和处理时间定时器队列。这里的定时器是什么?它有什么作用呢?接下来我们就来看看它的定义——WindowOperator的内部类Timer。Timer是所有时间窗口执行的基础,它其实是一个上下文对象,封装了三个属性:

  • timestamp:触发器触发的时间戳;
  • key:当前元素所归属的分组的键;
  • window:当前元素所属窗口;

在我们讲解窗口触发器时,我们曾提及过触发器上下文对象,它作为process系列方法参数。在WindowOperator内部我们终于看到了对该上下文对象接口的实现——Context,它主要提供了三种类型的方法:

  • 提供状态存储与访问;
  • 定时器的注册与删除;
  • 窗口触发器process系列方法的包装;

在注册定时器时,会新建定时器对象并将其加入到定时器队列中。等到时间相关的处理方法(processWatermark和trigger)被触发调用,则会从定时器队列中消费定时器对象并调用窗口触发器,然后根据触发结果来判断是否触动窗口的计算。我们选择事件时间的处理方法processWatermark进行分析(处理时间的处理方法trigger跟其类似):

public void processWatermark(Watermark mark) throws Exception {
    //定义一个标识,表示是否仍有定时器满足触发条件   
    boolean fire;   
    do {
        //从水印定时器队列中查找队首的一个定时器,注意此处并不是出队(注意跟remove方法的区别)      
        Timer<K, W> timer = watermarkTimersQueue.peek();      
        //如果定时器存在,且其时间戳戳不大于水印的时间戳
        //(注意理解条件是:不大于,水印用于表示小于该时间戳的元素都已到达,所以所有不大于水印的触发时间戳都该被触发)
        if (timer != null && timer.timestamp <= mark.getTimestamp()) {
            //置标识为真,表示找到满足触发条件的定时器         
            fire = true;         
            //将该元素从队首出队
            watermarkTimers.remove(timer);         
            watermarkTimersQueue.remove();
            //构建新的上下文         
            context.key = timer.key;         
            context.window = timer.window;         
            setKeyContext(timer.key);         
            //窗口所使用的状态存储类型为可追加的状态存储
            AppendingState<IN, ACC> windowState;         
            MergingWindowSet<W> mergingWindows = null;         
            //如果分配器是合并分配器(比如会话窗口)
            if (windowAssigner instanceof MergingWindowAssigner) {
                //获得合并窗口帮助类MergingWindowSet的实例            
                mergingWindows = getMergingWindowSet();            
                //获得当前窗口对应的状态窗口(状态窗口对应着状态后端存储的命名空间)
                W stateWindow = mergingWindows.getStateWindow(context.window);            
                //如果没有对应的状态窗口,则跳过本次循环
                if (stateWindow == null) {                              
                    continue;            
                }
                //获得当前窗口对应的状态表示            
                windowState = getPartitionedState(stateWindow, 
                    windowSerializer, windowStateDescriptor);         
            } else {
                //如果不是合并分配器,则直接获取窗口对应的状态表示            
                windowState = getPartitionedState(context.window, 
                    windowSerializer, windowStateDescriptor);         
            }
            //从窗口状态表示中获得窗口中所有的元素         
            ACC contents = windowState.get();         
            if (contents == null) {            
                // if we have no state, there is nothing to do            
                continue;         
            }
            //通过上下文对象调用窗口触发器的事件时间处理方法并获得触发结果对象
            TriggerResult triggerResult = context.onEventTime(timer.timestamp);         
            //如果触发的结果是FIRE(触动窗口计算),则调用fire方法进行窗口计算
            if (triggerResult.isFire()) {            
                fire(context.window, contents);         
            }
            //而如果触动的结果是清理窗口,或者事件时间等于窗口的清理时间(通常为窗口的maxTimestamp属性)         
            if (triggerResult.isPurge() || 
                (windowAssigner.isEventTime() 
                    && isCleanupTime(context.window, timer.timestamp))) {
                //清理窗口及元素            
                cleanup(context.window, windowState, mergingWindows);         
            }      
        } else {
            //队列中没有符合条件的定时器,置标识为否,终止循环         
            fire = false;      
        }   
    } while (fire);   
    //向下游发射水印
    output.emitWatermark(mark);   
    //将当前算子的水印属性用新水印的时间戳覆盖
    this.currentWatermark = mark.getTimestamp();
}

以上方法虽然冗长但流程还算清晰,其中的fire方法用于对窗口进行计算,它会调用内部窗口函数(即InternalWindowFunction,它包装了WindowFunction)的apply方法。

而isCleanupTime和cleanup这对方法主要涉及到窗口的清理。如果当前窗口是时间窗口,且窗口的时间到达了清理时间,则会进行清理窗口清理。那么清理时间如何判断呢?Flink是通过窗口的最大时间戳属性结合允许延迟的时间联合计算的:

private long cleanupTime(W window) {
    //清理时间被预置为窗口的最大时间戳加上允许的延迟事件   
    long cleanupTime = window.maxTimestamp() + allowedLateness;
    //如果窗口为非时间窗口(其maxTimestamp属性值为Long.MAX_VALUE),则其加上允许延迟的时间,
    //会造成Long溢出,从而会变成负数,导致cleanupTime < window.maxTimestamp 条件成立,
    //则直接将清理时间设置为Long.MAX_VALUE   
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}

求出清理时间后会与定时器注册的时间进行对比,如果两者相等则布尔条件为真,否则为假:

protected final boolean isCleanupTime(W window, long time) {   
    long cleanupTime = cleanupTime(window);   
    return  cleanupTime == time;
}

下面我们来看一下清理方法主要做了哪些事情:

private void cleanup(W window,               
    AppendingState<IN, ACC> windowState,               
    MergingWindowSet<W> mergingWindows) throws Exception {
    //清空窗口对应的状态后端的状态   
    windowState.clear();
    //如果支持窗口合并,则清空窗口合并集合中对应当前窗口的记录   
    if (mergingWindows != null) {   
        mergingWindows.retireWindow(window);   
    }
    //清空上下文对象状态   
    context.clear();
}

关于窗口清理,其实三大处理方法(processElement\/processWatermark\/trigger)都会进行判断,如果满足条件则清理。而真正注册清理定时器的逻辑在processElement中,它会调用registerCleanupTimer方法:

protected void registerCleanupTimer(W window) {
    //这里注册的时间即为计算过了的清理时间   
    long cleanupTime = cleanupTime(window);
    //根据不同的时间分类调用不同的注册方法   
    if (windowAssigner.isEventTime()) {      
        context.registerEventTimeTimer(cleanupTime);   
    } else {      
        context.registerProcessingTimeTimer(cleanupTime);   
    }
}

从上面的代码段可知:清理定时器跟普通定时器是一样的。

如果没有延迟,对于事件时间和处理时间而言,也许它们的窗口清理不一定是由清理定时器触发。因为在事件时间触发器和处理时间触发器中,它们注册的定时器对应的时间点就是窗口的最大时间戳。由于这些定时器在队列中一般排在清理定时器之前,所以这些定时器会优先于清理定时器得到执行(优先触发窗口的清理)。而这里的registerCleanupTimer方法,是一般化的清理机制,针对所有类型的窗口都适用,并确保窗口一定会得到清理。而对于刚刚提到的这种情况,重复的“清理”定时器并不会产生负作用。

WindowOperator还有一个继承者:EvictingWindowOperator,该算子在常规的窗口算子上支持了元素驱逐器(见上图中大虚线框内部的小虚线长方形)。EvictingWindowOperator特别的地方主要在于其fire的实现——在进行窗口计算之前会预先对符合驱逐条件的元素进行剔除,具体实现见如下代码:

private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {   
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());   
    //计算要驱逐的元素个数   
    int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);   
    FluentIterable<IN> projectedContents = FluentIterable      
        .from(contents)      
        .skip(toEvict)      
        .transform(new Function<StreamRecord<IN>, IN>() {         
            @Override         
            public IN apply(StreamRecord<IN> input) {            
                return input.getValue();         
            }      
        });   
    userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
}

在最终调用窗口计算的apply方法之前,会先计算要驱逐的元素个数,然后跳过这些元素并且跳过的都是从首个元素开始的连续个元素(这一点在之前我们分析窗口元素驱逐器是也曾提及过)。

这里采用了Guava类库的FluentIterable帮助类,它扩展了Iterable接口并提供了非常丰富的扩展API。



原文发布时间为:2016-10-29


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
67 5
|
1月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
96 0
|
1月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
72 0
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
241 2
|
3月前
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
60 2
|
1月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
41 0
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
78 0
|
3月前
|
SQL 流计算
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
92 2
|
3月前
|
SQL 流计算
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
30 1
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
81 1

热门文章

最新文章

下一篇
无影云桌面