Flink实战(七) - Time & Windows编程(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink实战(七) - Time & Windows编程(下)

7 窗口函数

定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元


的窗函数可以是一个ReduceFunction,AggregateFunction,FoldFunction或ProcessWindowFunction。前两个可以更有效地执行,因为Flink可以在每个窗口到达时递增地聚合它们的数据元.

ProcessWindowFunction获取Iterable窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。


具有ProcessWindowFunction的窗口转换不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有数据元。这可以通过组合来减轻ProcessWindowFunction与ReduceFunction,AggregateFunction或FoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。


7.1 ReduceFunction

指定如何组合输入中的两个数据元以生成相同类型的输出数据元.

Flink使用ReduceFunction来递增地聚合窗口的数据元.


定义和使用

Java

DataStream<Tuple2<String, Long>> input = ...;
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • Scala
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

原来传递进来的数据是字符串,此处我们就使用数值类型,通过数值类型来演示增量的效果

这里不是等待窗口所有的数据进行一次性处理,而是数据两两处理

17.png

输入

18.png

增量输出

19.png

Java

20.png

7.2 聚合函数

An AggregateFunction是一个通用版本,ReduceFunction它有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中数据元的类型,并且AggregateFunction具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。


与之相同ReduceFunction,Flink将在窗口到达时递增地聚合窗口的输入数据元。


一个AggregateFunction可以被定义并这样使用:

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }
  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }
  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }
  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}
DataStream<Tuple2<String, Long>> input = ...;
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
  • Scala
/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)
  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)
  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

7.3 ProcessWindowFunction

ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。


ProcessWindowFunction外观签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;
    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();
        /** Returns the current processing time. */
        public abstract long currentProcessingTime();
        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();
        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();
        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])
  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W
    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long
    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long
    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore
    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }
}

该key参数是通过KeySelector为keyBy()调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是Tuple,您必须手动将其转换为正确大小的元组以提取键字段。

A ProcessWindowFunction可以像这样定义和使用:

DataStream<Tuple2<String, Long>> input = ...;
input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
val input: DataStream[(String, Long)] = ...
input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

该示例显示了ProcessWindowFunction对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。

注意注意,使用ProcessWindowFunction简单的聚合(例如count)是非常低效的

21.png

8 水印

  • 推荐阅读

https://blog.csdn.net/lmalds/article/details/52704170

参考

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
18天前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
165 3
|
18天前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
18天前
|
Unix 编译器 开发者
Qt5.14.2 轻松掌握Qt中的压缩与解压缩:QuaZIP的神秘面纱与实战演练之windows环境编译
Qt5.14.2 轻松掌握Qt中的压缩与解压缩:QuaZIP的神秘面纱与实战演练之windows环境编译
175 0
|
14天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
33 0
|
14天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
33 0
|
14天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
34 0
|
18天前
|
API C++ Windows
windows编程入门_链接错误的配置
windows编程入门_链接错误的配置
20 0
|
18天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
18天前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
18天前
|
Windows
火山中文编程 -- 第一个windows程序
火山中文编程 -- 第一个windows程序
13 0