Apache Flink 漫谈系列(03) - Watermark

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实际问题(乱序) 在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点).选具体某个delay的元素来说,假设在一个5秒的Tumble窗口(详见Window介绍章节),有一个EventTime是 11秒的数据,在第16秒时候到来了。

实际问题(乱序)

在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点).选具体某个delay的元素来说,假设在一个5秒的Tumble窗口(详见Window介绍章节),有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:
image

那么对于一个Count聚合的Tumble(5s)的window,上面的情况如何处理才能window2=4,window3=2 呢?

Apache Flink的时间类型

开篇我们描述的问题是一个很常见的TimeWindow中数据乱序的问题,乱序是相对于事件产生时间和到达Apache Flink 实际处理算子的顺序而言的,关于时间在Apache Flink中有如下三种时间类型,如下图:

image

  • ProcessingTime
    是数据流入到具体某个算子时候相应的系统时间。ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。
  • IngestionTime
    IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。
  • EventTime
    EventTime是事件在设备上产生时候携带的。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

开篇描述的问题和本篇要介绍的Watermark所涉及的时间类型均是指EventTime类型。

什么是Watermark

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。 Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。 Watermark的产生和Apache Flink内部处理逻辑如下图所示: 
image

Watermark的产生方式

目前Apache Flink 有两种生产Watermark的方式,如下:

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。
    在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

所以Watermark的生成方式需要根据业务场景的不同进行不同的选择。

Watermark的接口定义

对应Apache Flink Watermark两种不同的生成方式,我们了解一下对应的接口定义,如下:

  • Periodic Watermarks - AssignerWithPeriodicWatermarks
/**
 * Returns the current watermark. This method is periodically called by the
 * system to retrieve the current watermark. The method may return {@code null} to
 * indicate that no new Watermark is available.
 *
 * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
 * is larger than that of the previously emitted watermark (to preserve the contract of
 * ascending watermarks). If the current watermark is still
 * identical to the previous one, no progress in EventTime has happened since
 * the previous call to this method. If a null value is returned, or theTimestamp
 * of the returned watermark is smaller than that of the last emitted one, then no
 * new watermark will be generated.
 *
 * <p>The interval in which this method is called and Watermarks are generated
 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
 *
 * @see org.Apache.flink.streaming.api.watermark.Watermark
 * @see ExecutionConfig#getAutoWatermarkInterval()
 *
 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 */
 @Nullable
 Watermark getCurrentWatermark();
  • Punctuated Watermarks - AssignerWithPunctuatedWatermarks 
public interface AssignerWithPunctuatedWatermarks<T> extendsTimestampAssigner<T> {

/**
 * Asks this implementation if it wants to emit a watermark. This method is called right after
 * the {@link #extractTimestamp(Object, long)} method.
 *
 * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
 * is larger than that of the previously emitted watermark (to preserve the contract of
 * ascending watermarks). If a null value is returned, or theTimestamp of the returned
 * watermark is smaller than that of the last emitted one, then no new watermark will
 * be generated.
 *
 * <p>For an example how to use this method, see the documentation of
 * {@link AssignerWithPunctuatedWatermarks this class}.
 *
 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 */
 @Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

AssignerWithPunctuatedWatermarks 继承了TimestampAssigner接口 -TimestampAssigner

public interfaceTimestampAssigner<T> extends Function {

/**
 * Assigns aTimestamp to an element, in milliseconds since the Epoch.
 *
 * <p>The method is passed the previously assignedTimestamp of the element.
 * That previousTimestamp may have been assigned from a previous assigner,
 * by ingestionTime. If the element did not carry aTimestamp before, this value is
 * {@code Long.MIN_VALUE}.
 *
 * @param element The element that theTimestamp is wil be assigned to.
 * @param previousElementTimestamp The previous internalTimestamp of the element,
 *                                 or a negative value, if noTimestamp has been assigned, yet.
 * @return The newTimestamp.
 */
long extractTimestamp(T element, long previousElementTimestamp);
}

从接口定义可以看出,Watermark可以在Event(Element)中提取EventTime,进而定义一定的计算逻辑产生Watermark的时间戳。

Watermark解决如上问题

从上面的Watermark生成接口和Apache Flink内部对Periodic Watermark的实现来看,Watermark的时间戳可以和Event中的EventTime 一致,也可以自己定义任何合理的逻辑使得Watermark的时间戳不等于Event中的EventTime,Event中的EventTime自产生那一刻起就不可以改变了,不受Apache Flink框架控制,而Watermark的产生是在Apache Flink的Source节点或实现的Watermark生成器计算产生(如上Apache Flink内置的 Periodic Watermark实现), Apache Flink内部对单流或多流的场景有统一的Watermark处理。

回过头来我们在看看Watermark机制如何解决上面的问题,上面的问题在于如何将迟来的EventTime 位11的元素正确处理。要解决这个问题我们还需要先了解一下EventTime window是如何触发的? EventTime window 计算条件是当Window计算的Timer时间戳 小于等于 当前系统的Watermak的时间戳时候进行计算。 

  • 当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:!image

 上面对应的DDL(Alibaba 企业版的Flink分支)定义如下:
 
CREATE TABLE source(
  ...,
  Event_timeTimeStamp,
  WATERMARK wk1 FOR Event_time as withOffset(Event_time, 0)
) with (
  ...
);
 

  • 如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:
    image

上面对应的DDL(Alibaba 内部的DDL语法,目前正在和社区讨论)定义如下: 
CREATE TABLE source(
  ...,
  Event_timeTimeStamp,
  WATERMARK wk1 FOR Event_time as withOffset(Event_time, 5000)
) with (
  ...
);
上面正确处理的根源是我们采取了 延迟触发 window 计算 的方式正确处理了 Late Event. 与此同时,我们发现window的延时触发计算,也导致了下游的LATENCY变大,本例子中下游得到window的结果就延迟了5s.

多流的Watermark处理

在实际的流计算中往往一个job中会处理多个Source的数据,对Source的数据进行GroupBy分组,那么来自不同Source的相同key值会shuffle到同一个处理节点,并携带各自的Watermark,Apache Flink内部要保证Watermark要保持单调递增,多个Source的Watermark汇聚到一起时候可能不是单调自增的,这样的情况Apache Flink内部是如何处理的呢?如下图所示:

image

 
Apache Flink内部实现每一个边上只能有一个递增的Watermark, 当出现多流携带Eventtime汇聚到一起(GroupBy or Union)时候,Apache Flink会选择所有流入的Eventtime中最小的一个向下游流出。从而保证watermark的单调递增和保证数据的完整性.如下图:
 
image

 

小结

本节以一个流计算常见的乱序问题介绍了Apache Flink如何利用Watermark机制来处理乱序问题. 本篇内容在一定程度上也体现了EventTime Window中的Trigger机制依赖了Watermark(后续Window篇章会介绍)。Watermark机制是流计算中处理乱序,正确处理Late Event的核心手段。

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
29天前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
116 2
|
2月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
37 2
|
21天前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
292 31
Apache Flink 流批融合技术介绍
|
2月前
|
数据采集 分布式计算 Kubernetes
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
47 4
|
2月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
51 0
|
2月前
|
消息中间件 Java 数据处理
揭秘Apache Flink的Exactly-Once神技:如何在数据流海中确保每条信息精准无误,不丢不重?
【8月更文挑战第26天】Apache Flink 是一款先进的流处理框架,其核心特性 Exactly-Once 语义保证了数据处理的精准无误。尤其在金融及电商等高要求场景下,该特性极为关键。本文深入解析 Flink 如何实现 Exactly-Once 语义:通过状态管理确保中间结果可靠存储;利用一致的检查点机制定期保存状态快照;以及通过精确的状态恢复避免数据重复处理或丢失。最后,提供一个 Java 示例,展示如何计算用户访问次数,并确保 Exactly-Once 语义的应用。
56 0
|
2月前
|
监控 Apache 流计算
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
45 0
|
SQL 架构师 API
《Apache Flink 知其然,知其所以然》系列视频课程
# 课程简介 目前在我的公众号新推出了《Apache Flink 知其然,知其所以然》的系列视频课程。在内容上会先对Flink整体架构和所适用的场景做一个基础介绍,让你对Flink有一个整体的认识!然后对核心概念进行详细介绍,让你深入了解流计算中一些核心术语的含义,然后对Flink 各个层面的API,如 SQL/Table&DataStreamAPI/PythonAPI 进行详细的介绍,以及
1350 0
《Apache Flink 知其然,知其所以然》系列视频课程
|
2月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
40 1

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多