[Flink]Flink1.3 Stream指南八 图解事件时间与Watermarks

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 如果你正在构建实时流处理应用程序,那么事件时间处理是你迟早必须使用的功能之一。因为在现实世界的大多数用例中,消息到达都是无序的,应该有一些方法,通过你建立的系统知道消息可能延迟到达,并且有相应的处理方案。

如果你正在构建实时流处理应用程序,那么事件时间处理是你迟早必须使用的功能之一。因为在现实世界的大多数用例中,消息到达都是无序的,应该有一些方法,通过你建立的系统知道消息可能延迟到达,并且有相应的处理方案。在这篇博文中,我们将看到为什么我们需要事件时间处理,以及我们如何在ApacheFlink中使用它。

EventTime是事件在现实世界中发生的时间,ProcessingTime是Flink系统处理该事件的时间。要了解事件时间处理的重要性,我们首先要建立一个基于处理时间的系统,看看它的缺点。

我们创建一个大小为10秒的滑动窗口,每5秒滑动一次,在窗口结束时,系统将发送在此期间收到的消息数。 一旦了解了EventTime处理在滑动窗口如何工作,那么了解其在滚动窗口中如何工作也就不是难事。所以让我们开始吧。

1. 基于处理时间的系统

在这个例子中,我们期望消息具有一定格式的值,时间戳就是消息的那个值,同时时间戳是在源产生此消息的时间。由于我们正在构建基于处理时间的系统,因此以下代码忽略了时间戳部分。

我们需要知道消息中应包含消息产生时间是很重要的。Flink或任何其他系统不是一个魔术盒,可以以某种方式自己生成这个产生时间。稍后我们将看到,事件时间处理提取此时间戳信息来处理延迟消息。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

1.1 消息到达无延迟

假设源分别在第13秒产生两个类型a的消息以及在第16秒产生一个消息。(小时和分钟不重要,因为窗口大小只有10秒)。

这些消息将落入如下所示窗口中。前两个在第13秒产生的消息将落入窗口1[5s-15s]和窗口2[10s-20s]中,第三个在第16秒产生的消息将落入窗口2[10s-20s]和窗口3[15s-25s]中。每个窗口发出的最终计数分别为(a,2),(a,3)和(a,1)。

该输出跟预期的输出是一样的。现在我们看看当一个消息延迟到达系统时会发生什么。

1.2 消息延迟到达

现在假设其中一条消息(在第13秒产生)延迟到达6秒(第19秒到达),可能是由于某些网络拥塞。你能猜测这个消息会落入哪个窗口?

延迟的消息落入窗口2和窗口3中,因为19在10-20和15-25之间。在窗口2中计算没有任何问题(因为消息本应该落入这个窗口),但是它影响了窗口1和窗口3的计算结果。我们现在将尝试使用EventTime处理来解决这个问题。

2. 基于EventTime的系统

要使用EventTime处理,我们需要一个时间戳提取器,从消息中提取事件时间信息。请记住,消息是有格式值,时间戳。 extractTimestamp方法获取时间戳部分并将其作为Long类型返回。现在忽略getCurrentWatermark方法,我们稍后再回来:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong
  }
  override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis)
  }
}

现在我们需要设置这个时间戳提取器,并将TimeCharactersistic设置为EventTime。其余的代码与ProcessingTime的情况保持一致:

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

运行上述代码的结果如下图所示:

结果看起来更好一些,窗口2和3现在是正确的结果,但是窗口1仍然是有问题的。Flink没有将延迟的消息分配给窗口3,因为现在检查消息的事件时间,并且知道它不应该在那个窗口中。但是为什么没有将消息分配给窗口1?原因是当延迟的信息到达系统时(第19秒),窗口1的评估( evaluation)已经完成了(第15秒)。现在让我们尝试通过使用Watermark来解决这个问题。

3. Watermark

Watermark是一个非常重要同时也是非常有趣的想法,我将尽力给你一个简短的概述。如果你有兴趣了解更多信息,你可以从Google中观看这个令人敬畏的演讲,还可以从dataArtisans那里阅读此博客。 Watermark本质上是一个时间戳。当Flink中的算子(operator)接收到Watermark时,它明白它不会看到比该时间戳更早的消息。因此,在EventTime中,Watermark也可以被认为是告诉Flink它有多远的一种方式(Hence watermark can also be thought of as a way of telling Flink how far it is, in the “EventTime”.)。

在这个例子中Watermark的目的,就是把它看作是告诉Flink一个消息延迟多少的方式。在最后一次尝试中,我们将Watermark设置为当前系统时间。因此,不会有任何延迟的消息。我们现在将Watermark设置为当前时间减去5秒,这告诉Flink希望消息最多延迟5秒钟,这是因为每个窗口仅在Watermark通过时被评估。由于我们的Watermark是当前时间减去5秒,所以第一个窗口[5s-15s]将会在第20秒被评估。类似地,窗口[10s-20s]将会在第25秒进行评估,依此类推(译者注:窗口延迟评估)。

override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis - 5000)
}

通常最好保持接收到的最大时间戳,并创建最大时间减去预期延迟时间的Watermark,而不是用当前系统时间减去延迟时间(create the watermark with max - expected delay, instead of subtracting from the current system time)。

进行上述更改后运行代码的结果是:

最后我们得到了正确的结果,所有这三个窗口现在都按照预期的方式发送计数,就是(a,2),(a,3)和(a,1)。

我们也可以使用AllowedLateness功能设置消息的最大允许延迟时间来解决这个问题。

4. 结论

实时流处理系统的重要性日益增长,延迟消息的处理是你构建任何此类系统的一部分。在这篇博文中,我们看到延迟到达的消息会影响系统的结果,以及如何使用ApacheFlink的事件时间功能来解决它们。

原文:http://vishnuviswanath.com/flink_eventtime.html

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
10天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
32 9
|
1月前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
37 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
1月前
|
Java Shell 流计算
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
23 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
|
1月前
|
存储 Java 数据处理
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
35 1
|
1月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
53 0
|
3月前
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
运维 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在处理MySQL表新增数据记录时,没有正确触发变更事件,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步MySQL数据并EP(复杂事件处理)时,编译报错,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
122 1
|
5月前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
410 0