【建议收藏】Flink watermark分析实战(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【建议收藏】Flink watermark分析实战

摘要

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算 flink中提供了时间窗的相关算子计算区域时间内的数据 本次分享基于flink 1.14 此次分享内容中,api演示与旧版略有不同,概念并无不同 本次分享需要对流式数据处理计算有一定的了解

概念篇

Flink时间语义概念简介

  • 在flink的流式处理中,会涉及到时间的不同概念
  • Processing Time 处理时间
  • Event Time 事件时间
  • Ingestion Time 注入时间
  • Processing Time 处理时间

每一个执行基于时间操作的算子的本地系统时间,与机器相关

  • Event Time 事件时间

事件发生的时间,通常由数据中的某个字段进行提供。

  • Ingestion Time 注入时间

数据进入flink的事件

640.png

时间语义

  • 就1.14版本而言,根据时间推进和时间判断的不同标准,一共由两种时间语义
  • 以process time为依据:处理时间语义
  • 以event time为依据:事件时间语义

对于事件时间的重要性和应用场景众所周知

需求和问题

需求

当前时间13:10,计算[13:00,13:10)分钟内订单数量/活跃用户数量

已知

flink中提供了时间窗的相关算子计算区域时间内的数据.

问题

由于网络波动或者网络传输的时间消耗, 一条由13:09分产生的数据,在13:11分才进入计算逻辑, 那么是否要将此数据计入到计算结果中?

在分布式运算中,不同节点的运算速度不同, 时间窗口先接收到一个并发中发送的13:10:00:000的数据, 时间窗口后接收到一个并发中发送的13:09:59:999的数据, 那么是否要将后接收到的这条数据计入到计算结果中?

分享者回答

如果是以事件时间进行处理的话,应当计入计算结果 如果是以处理时间进行处理的话,可以不计入计算结果 flink1.14不使用注入时间

再次提问

在业务场景中,我们很多需求都是要使用事件时间来作为依据的, 我想要按照时间事件来完成这个需求, 并且将迟到的数据也纳入到计算结果中, 应该如何解决?

问题总结

使用EventTime所要面对的问题

  1. 数据延迟
  1. 网络延迟
  2. 性能延迟
  3. ...
  1. 乱序
  1. 数据源数据相对于时间本身就无序
  2. 分布式场景下本身有序的数据也难以保持被读取时有序
  3. ...

分享者回答

如果使用类似于hive、doris、clinkhous之类的olap数据仓库, 我们可以等待到一个合适的时间(数据完全到达之后)再进行排序/计算, 而在flink中,提供了一个叫做watermark的机制来完成这个需求,应对这些问题。

watermark

背景

在流式数据中,虽然数据本身是按照时间顺序向下游推送的, 但在网络环境、分布式等因素下, 导致到达时间窗中的顺序并不是按照原本发送的顺序。有时数据发送的本身就不是按照严格的事件时间进行推送的

什么是watermark

以前我对watermark了解不够深的时候,我以为watermark是flink的时间等待机制, 后来我才知道,watermark是flink的事件时间推进机制,事件时间等待机制,只是他的一部分。

  1. watermark是解决数据乱序到达的,也可以理解为解决数据延迟到达,
  2. watermark在解决上述问题时,要结合flink的window(时间窗)机制,
  3. flink中的window(时间窗)是由watermark来触发的,这就意味着窗口触发时,数据中timeStamp<=watermark的,均已到达时间窗

watermark 事件时间推进机制

  • 特点
  1. watermark本身也会是上游向下游发送数据时,附带的一个记录
  2. watermark必须是单调递增的,保证任务的时间一直在往前推进,不可后退
  3. watermark由数据中的时间戳来更新

watermark的生命周期

env.getConfig().setAutoWatermarkInterval(200)//(默认值200ms)
如果要禁用watermark机制,可以通过设置watermark生成频率来实现
env.getConfig().setAutoWatermarkInterval(0)//(默认值200ms)
AssignerWithPeriodicWatermarks  (已过期)周期性生成watermark
AssignerWithPunctuatedWatermarks(已过期) 按照指定标记性事件生成watermark

watermark的更新机制

当flink开启watermark时,在所有的并发中的数据首先经过watermark管理,

source算子每200ms从数据中获取一次时间戳,并更新自己的maxTimeStamp,并广播到下游

下游的算子拿到数据时,并不会根据数中时间进行更新watermark,而是根据上游发送过来的数据中携带的maxTimeStamp来更新自身watermark的值

640.png

而是根据watermark广播到下游的maxTimeStamp值进行触发和结束,计算,

下游每200ms对比各个并发发送的maxTimeStamp,并根据最小值,刷新自身的maxTimeStamp并广播到下游

当上游有多个watermark发来的maxTimeStamp值,下游更新自身maxTimeStamp时取最小值 以最小值为基准,较大值到达时可以分发到他应该到的时间分桶中, 如果收到超出时间窗之外的未来数据,会创建此数据应有的时间窗,并开始缓存,时间窗(桶)的数量时没有限制的 如果以最大值为基准,会导致时间窗提前结束,maxTimeStamp较小的被抛弃掉

640.png

问题/需求解决

watermark是如何解决我们讨论之处提出的问题的呢? 我们也是时候上代码了!!!

完整的watermark使用代码

WatermarkStrategy<Bean> beanWatermarkStrategy = WatermarkStrategy
               .forGenerator(new WatermarkGeneratorSupplier<Bean>() {
                   @Override
                   public WatermarkGenerator<Bean> createWatermarkGenerator(Context context) {
                      return new WatermarkGenerator<Bean>() {
                          /** 最大时间戳. */
                           private long maxTimestamp;
                          /** 水印生成的最大无序度 */
                           private final long outOfOrdernessMillis = 0;
                           //watermark比较器
                           @Override
                           public void onEvent(Bean event, long eventTimestamp, WatermarkOutput output) {
                               maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                           }
                           //watermark生成和发送
                           @Override
                           public void onPeriodicEmit(WatermarkOutput output) {
                               output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
                           }
                       };
                   }
               })
//                .noWatermarks()  //创建完全不生成水印的水印策略。这在执行纯处理基于时间的流处理的场景中可能很有用。
//                .forMonotonousTimestamps()    //紧跟最大时间时间,完全不容忍乱序
//                .<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允许乱序的生成策略   最大时间时间-容错时间
               .withIdleness(Duration.ofSeconds(5))    //当某一并发迟迟没有数据进来时,多长时间发送一次watermark值
               .withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
                   @Override
                   public long extractTimestamp(Bean element, long recordTimestamp) {
                       return element.getEventTime();
                   }
               })//watermark提取策略(从数据中)

小延迟 - watermark推后机制 - BoundedOutOfOrderness策略

  • BoundedOutOfOrderness策略

用wartermark容错,减慢时间的推进,在迟到数据到达时,让下游认为他还没有迟到

说句人话,实际上就是用已经获取到的时间戳-允许迟到的时间=watermark值

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
7月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
17天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
2月前
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
3月前
|
消息中间件 Kafka API
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
|
4月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46950 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
5月前
|
消息中间件 监控 Java
一次线上Flink 背压情况分析之重新认识java dump 文件
一次线上Flink 背压情况分析之重新认识java dump 文件
77 0
|
5月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
67 0
|
5月前
|
消息中间件 大数据 Kafka
Flink史上最简单双十一实时分析案例
Flink史上最简单双十一实时分析案例
40 0
|
6月前
|
资源调度 Java 调度
Flink教程(05)- Flink原理简单分析
Flink教程(05)- Flink原理简单分析
69 0