Flink实战(七) - Time & Windows编程(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink实战(七) - Time & Windows编程(下)

7 窗口函数

定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元


的窗函数可以是一个ReduceFunction,AggregateFunction,FoldFunction或ProcessWindowFunction。前两个可以更有效地执行,因为Flink可以在每个窗口到达时递增地聚合它们的数据元.

ProcessWindowFunction获取Iterable窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。


具有ProcessWindowFunction的窗口转换不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有数据元。这可以通过组合来减轻ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。


7.1 ReduceFunction

指定如何组合输入中的两个数据元以生成相同类型的输出数据元.

Flink使用ReduceFunction来递增地聚合窗口的数据元.


定义和使用

Java

DataStream<Tuple2<String, Long>> input = ...;
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • Scala
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

原来传递进来的数据是字符串,此处我们就使用数值类型,通过数值类型来演示增量的效果

这里不是等待窗口所有的数据进行一次性处理,而是数据两两处理

17.png

输入

18.png

增量输出

19.png

Java

20.png

7.2 聚合函数

An AggregateFunction是一个通用版本,ReduceFunction它有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中数据元的类型,并且AggregateFunction具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。


与之相同ReduceFunction,Flink将在窗口到达时递增地聚合窗口的输入数据元。


一个AggregateFunction可以被定义并这样使用:

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }
  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }
  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }
  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}
DataStream<Tuple2<String, Long>> input = ...;
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
  • Scala
/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)
  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)
  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

7.3 ProcessWindowFunction

ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。


ProcessWindowFunction外观签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;
    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();
        /** Returns the current processing time. */
        public abstract long currentProcessingTime();
        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();
        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();
        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])
  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W
    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long
    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long
    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore
    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }
}

该key参数是通过KeySelector为keyBy()调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple,您必须手动将其转换为正确大小的元组以提取键字段。

A ProcessWindowFunction可以像这样定义和使用:

DataStream<Tuple2<String, Long>> input = ...;
input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
val input: DataStream[(String, Long)] = ...
input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

该示例显示了ProcessWindowFunction对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。

注意注意,使用ProcessWindowFunction简单的聚合(例如count)是非常低效的

21.png

8 水印

  • 推荐阅读

https://blog.csdn.net/lmalds/article/details/52704170

参考

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
Java 开发工具
鸿蒙Flutter实战:02-Windows环境搭建踩坑指南
本指南介绍如何搭建鸿蒙Flutter开发环境,包括下载Flutter SDK、配置环境变量(如FLUTTER_STORAGE_BASE_URL、PUB_HOSTED_URL、DEVECO_SDK_HOME等)和检查工具版本。还提到避免项目路径过深、与SDK同盘存放等注意事项,以及解决VsCode无法识别设备的方法。
65 0
|
3月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
89 0
|
3月前
|
人工智能 监控 安全
掌握Windows管理利器:WMI命令实战
本文介绍了Windows Management Instrumentation (WMI) 的基本概念和用途,通过多个实用的`wmic`命令示例,如获取CPU信息、查看操作系统详情、管理服务、检查磁盘空间等,展示了WMI在系统维护中的强大功能。适合IT专业人士学习和参考。
79 4
|
4月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
4月前
|
Windows
[原创]用MASM32编程获取windows类型
[原创]用MASM32编程获取windows类型
|
4月前
|
JavaScript 前端开发 API
MASM32编程通过WMI获取Windows计划任务
MASM32编程通过WMI获取Windows计划任务
|
5月前
|
API Docker Windows
2024 Ollama 一站式解决在Windows系统安装、使用、定制服务与实战案例
这篇文章是一份关于Ollama工具的一站式使用指南,涵盖了在Windows系统上安装、使用和定制服务,以及实战案例。
2024 Ollama 一站式解决在Windows系统安装、使用、定制服务与实战案例
|
3月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
45 0
|
4月前
|
API Windows
MASM32编程获取Windows当前桌面主题名
MASM32编程获取Windows当前桌面主题名
|
5月前
|
数据安全/隐私保护 流计算
Flink四大基石——2.Time
Flink四大基石——2.Time
50 1