Flink源码分析: 窗口机制的执行流程

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇文章主要是研究一下Flink的window执行流程,但是不会详细的分析代码实现的细节,因为这部分的代码还是非常多的,先了解一下代码执行的整个流程,为后面分析WindowOperator的源码实现逻辑做一个铺垫.关于Flink的window使用相信大家都比较熟悉了,日常开发中很多场景都会用到window,可以说window是Flink流计算的核心功能之一,我们先来看下官网对于window的使用流程介绍.(这里以keyed Windows为例).

这篇文章主要是研究一下Flink的window执行流程,但是不会详细的分析代码实现的细节,因为这部分的代码还是非常多的,先了解一下代码执行的整个流程,为后面分析WindowOperator的源码实现逻辑做一个铺垫.


关于Flink的window使用相信大家都比较熟悉了,日常开发中很多场景都会用到window,可以说window是Flink流计算的核心功能之一,我们先来看下官网对于window的使用流程介绍.(这里以keyed Windows为例).


stream       .keyBy(...)               <-  keyed versus non-keyed windows       .window(...)              <-  required: "assigner"      [.trigger(...)]            <-  optional: "trigger" (else default trigger)      [.evictor(...)]            <-  optional: "evictor" (else no evictor)      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)       .reduce/aggregate/fold/apply()      <-  required: "function"      [.getSideOutput(...)]      <-  optional: "output tag"

这里的组件我就不在一一介绍了,可以看到有些是必须选择的,有一些是有默认值的,还有一些是可选择的,不同的场景使用方法也不一样.


在看源码之前先来看一下Flink数据流上的类型和操作.下图展示了 Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系。



从上图可以看到所有的transform操作都是基于DataStream,然后经过transform操作又返回了DataStream类型(中间可能有别的类型但是最终都又回到了DataStream类型),对于window来说,首先DataStream经过keyby返回的是KeyedStream类型,用来表示根据指定的key进行分组的数据流,然后在经过window返回的是WindowedStream流类型,代表了根据key分组,并且基于WindowAssigner切分窗口的数据流,然后在对WindowedStream做比如sum,max,或者自定义的process函数操作后,最终又会返回DataStream类型.


需要注意的是WindowedStream并不是DataStream的子类,KeyedStream是DataStream的子类,但是他们都是在同一个包下面的.


网络异常,图片无法展示
|


我们先来看一下KeyedStream中的window的源码如下:


@PublicEvolving  public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {    return new WindowedStream<>(this, assigner);  }


可以看到它返回的是一个WindowedStream类型的数据,然后接着看WindowedStream的源码


public class WindowedStream<T, K, W extends Window> {  /** The keyed data stream that is windowed by this stream. */  private final KeyedStream<T, K> input;  /** The window assigner. */  private final WindowAssigner<? super T, W> windowAssigner;  /** The trigger that is used for window evaluation/emission. */  private Trigger<? super T, ? super W> trigger;  /** The evictor that is used for evicting elements before window evaluation. */  private Evictor<? super T, ? super W> evictor;  /** The user-specified allowed lateness. */  private long allowedLateness = 0L;  /**   * Side output {@code OutputTag} for late data. If no tag is set late data will simply be   * dropped.    */  private OutputTag<T> lateDataOutputTag;  @PublicEvolving  public WindowedStream(KeyedStream<T, K> input,      WindowAssigner<? super T, W> windowAssigner) {    this.input = input;    this.windowAssigner = windowAssigner;    this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());  }  }


这里只贴了部分代码,可以看到WindowedStream的成员变量有KeyedStream,WindowAssigner,Trigger 等 这些不就是上面window程序结构里面的东西吗 上面的window操作就是调用了WindowedStream的构造方法,初始化了窗口分配器,输入数据,trigger几个变量.那Flink是怎么把这些组件串起来调用的呢? 就在我们自定义的窗口函数里面.


@Internal  public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {    function = input.getExecutionEnvironment().clean(function);    return apply(new InternalIterableProcessWindowFunction<>(function), resultType, function);  }


这个方法是将给定的窗口函数应用于每个窗口。分别为每个窗口函数调用我们自己定义的方法.也就是我们自己定义的apply/process方法,不管是apply还是process最终都会调用apply方法.


private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
    final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);    KeySelector<T, K> keySel = input.getKeySelector();
    WindowOperator<K, T, Iterable<T>, R, W> operator;
    if (evictor != null) {      @SuppressWarnings({"unchecked", "rawtypes"})      TypeSerializer<StreamRecord<T>> streamRecordSerializer =          (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =          new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      operator =        new EvictingWindowOperator<>(windowAssigner,          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),          keySel,          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),          stateDesc,          function,          trigger,          evictor,          allowedLateness,          lateDataOutputTag);
    } else {      ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",        input.getType().createSerializer(getExecutionEnvironment().getConfig()));
      operator =        new WindowOperator<>(windowAssigner,          windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),          keySel,          input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),          stateDesc,          function,          trigger,          allowedLateness,          lateDataOutputTag);    }
    return input.transform(opName, resultType, operator);  }


apply这个方法中间经过各种操作,最终调用了input.transform方法,这个方法返回的是SingleOutputStreamOperator类型的数据,SingleOutputStreamOperator又是DataStream的一个子类,所以最终还是返回了DataStream类型,和上面说的数据流上的类型和操作可以对应上.


apply方法的逻辑非常简单,主要就是判断evictor是否为空,然后分别走了不同的WindowOperator,其实最终真正的处理数据的逻辑是在WindowOperator里面的,window中的windowAssigner 、trigger、evictor这些都是被封装到


WindowOperator这个类里面的,然后又调用了DataStream的transform方法最终返回的还是DataStream类型. 到这里相信大家对整个流程会清晰一点了.


这篇文章就先介绍到这里,后面会单独分析WindowOperator的源码,因为这部分的关系还是比较复杂的.


总结一下:


这篇文章主要介绍了Flink的Window机制内部执行的流程,首先是DataStream经过keyby返回keyedstream类型,然后在经过window操作返回windowedstream类型,然后在windowedstream上调用我们自己定义的apply或者process方法,最终都会调用apply方法,这个方法里会把所有的东西封装到WindowOperator里面,最后调用transform方法返回DataStream流类型的数据,有一种跑了一圈又回到原地的感觉.


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
89 3
|
2月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
110 0
|
2月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
112 0
|
6天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
64 27
|
4月前
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
72 2
|
4月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
66 0
|
6月前
|
消息中间件 存储 NoSQL
Flink(十二)【容错机制】(4)
Flink(十二)【容错机制】
|
2月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
50 0
|
4月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
联通实时计算平台问题之实时计算平台的数据处理流程是什么样的
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
102 2