【Flink】(五)时间语义和水位线 (Watermark) 2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(五)时间语义和水位线 (Watermark) 2


3.4 Watermark 的引入


watermark 的引入很简单,对于乱序数据,最常见的引用方式如下:

dataStream.assignTimestampsAndWatermarks( new 
BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.millisecond
s(1000)) {
 override def extractTimestamp(element: SensorReading): Long = {
 element.timestamp * 1000
 }
} )


Event Time 的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time 了)。


我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource) 
.assignTimestampsAndWatermarks(new MyAssigner())

MyAssigner 有两种类型


AssignerWithPeriodicWatermarks

AssignerWithPunctuatedWatermarks

以上两个接口都继承自 TimestampAssigner。


Assigner with periodic watermarks

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也是一种特殊的事件!)。默认周期是 200 毫秒。可以使用

ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔 5 秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000)

产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用

AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。


如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark。


例子,自定义一个周期性的时间戳抽取:

class PeriodicAssigner extends
AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 延时为 1 分钟
var maxTs: Long = Long.MinValue // 观察到的最大时间戳
override def getCurrentWatermark: Watermark = {
  new Watermark(maxTs - bound) 
}
override def extractTimestamp(r: SensorReading, previousTS: Long) = {
maxTs = maxTs.max(r.timestamp) 
r.timestamp
  } 
}


Assigner with punctuated watermarks


间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1 的传感器的数据流插入 watermark:

class PunctuatedAssigner extends
AssignerWithPunctuatedWatermarks[SensorReading] {
  val bound: Long = 60 * 1000
  override def checkAndGetNextWatermark(r: SensorReading, extractedTS:
Long): Watermark = {
  if (r.id == "sensor_1") {
    new Watermark(extractedTS - bound) 
  } else {
  null
  } 
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long
= { 
  r.timestamp
  } 
}


3.5 Watermark 的设定


在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解


如果 watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果


而如果 watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题


四、EvnetTime 在 window 中的使用


4.1 滚动窗口(TumblingEventTimeWindows)

def main(args: Array[String]): Unit = {
 // 环境
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 env.setParallelism(1)
 val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
 val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map
{ text =>
 val arr: Array[String] = text.split(" ")
 (arr(0), arr(1).toLong, 1)
 }
 val textWithEventTimeDstream: DataStream[(String, Long, Int)] = 
textWithTsDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[(String, Long,
Int)](Time.milliseconds(1000)) {
 override def extractTimestamp(element: (String, Long, Int)): Long = {
 return element._2
 }
 })
 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = 
textWithEventTimeDstream.keyBy(0)
 textKeyStream.print("textkey:")
 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] 
= textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))
 val groupDstream: DataStream[mutable.HashSet[Long]] = 
windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) 
=>
 set += ts
 }
 groupDstream.print("window::::").setParallelism(1)
  env.execute()
  } 
 }


结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。


4.2 滑动窗口(SlidingEventTimeWindows)

def main(args: Array[String]): Unit = {
 // 环境
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 env.setParallelism(1)
 val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
 val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text 
=>
 val arr: Array[String] = text.split(" ")
 (arr(0), arr(1).toLong, 1)
 }
 val textWithEventTimeDstream: DataStream[(String, Long, Int)] = 
textWithTsDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[(String, Long, 
Int)](Time.milliseconds(1000)) {
 override def extractTimestamp(element: (String, Long, Int)): Long = {
  return element._2
 }
 })
 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = 
textWithEventTimeDstream.keyBy(0)
 textKeyStream.print("textkey:")
 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = 
textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.millis
econds(500)))
 val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new 
mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
 set += ts
 }
 groupDstream.print("window::::").setParallelism(1)
 env.execute()
}


4.3 会话窗口(EventTimeSessionWindows)


相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark, 会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。

def main(args: Array[String]): Unit = {
 // 环境
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 env.setParallelism(1)
 val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
 val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text 
=>
 val arr: Array[String] = text.split(" ")
 (arr(0), arr(1).toLong, 1)
 }
 val textWithEventTimeDstream: DataStream[(String, Long, Int)] = 
textWithTsDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[(String, Long, 
Int)](Time.milliseconds(1000)) {
 override def extractTimestamp(element: (String, Long, Int)): Long = {
 return element._2
 }
 })
 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = 
textWithEventTimeDstream.keyBy(0)
 textKeyStream.print("textkey:")
 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] 
=
textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)
) )
 windowStream.reduce((text1,text2)=>
 ( text1._1,0L,text1._3+text2._3)
 ) .map(_._3).print("windows:::").setParallelism(1)
  env.execute()
 }



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
程序员 API 数据安全/隐私保护
[尚硅谷flink] 水位线
[尚硅谷flink] 水位线
|
3月前
|
数据处理 Apache 流计算
Flink Watermark和时间语义
Flink Watermark和时间语义
49 2
|
3月前
|
Apache 流计算
【Flink】Flink的三种时间语义
【4月更文挑战第19天】【Flink】Flink的三种时间语义
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之水位线的设置方法是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如果要加水位线,应该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
数据处理 Apache 流计算
Flink Watermark和时间语义
Flink Watermark和时间语义
|
3月前
|
数据处理 Apache 流计算
【Flink】Flink 中的Watermark机制
【4月更文挑战第21天】【Flink】Flink 中的Watermark机制
|
3月前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器
|
3月前
|
存储 SQL Java
Flink报错问题之使用Watermark报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
运维 监控 数据处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理
117 5