【建议收藏】Flink watermark分析实战(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【建议收藏】Flink watermark分析实战

摘要

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算 flink中提供了时间窗的相关算子计算区域时间内的数据 本次分享基于flink 1.14 此次分享内容中,api演示与旧版略有不同,概念并无不同 本次分享需要对流式数据处理计算有一定的了解

概念篇

Flink时间语义概念简介

  • 在flink的流式处理中,会涉及到时间的不同概念
  • Processing Time 处理时间
  • Event Time 事件时间
  • Ingestion Time 注入时间
  • Processing Time 处理时间

每一个执行基于时间操作的算子的本地系统时间,与机器相关

  • Event Time 事件时间

事件发生的时间,通常由数据中的某个字段进行提供。

  • Ingestion Time 注入时间

数据进入flink的事件

640.png

时间语义

  • 就1.14版本而言,根据时间推进和时间判断的不同标准,一共由两种时间语义
  • 以process time为依据:处理时间语义
  • 以event time为依据:事件时间语义

对于事件时间的重要性和应用场景众所周知

需求和问题

需求

当前时间13:10,计算[13:00,13:10)分钟内订单数量/活跃用户数量

已知

flink中提供了时间窗的相关算子计算区域时间内的数据.

问题

由于网络波动或者网络传输的时间消耗, 一条由13:09分产生的数据,在13:11分才进入计算逻辑, 那么是否要将此数据计入到计算结果中?

在分布式运算中,不同节点的运算速度不同, 时间窗口先接收到一个并发中发送的13:10:00:000的数据, 时间窗口后接收到一个并发中发送的13:09:59:999的数据, 那么是否要将后接收到的这条数据计入到计算结果中?

分享者回答

如果是以事件时间进行处理的话,应当计入计算结果 如果是以处理时间进行处理的话,可以不计入计算结果 flink1.14不使用注入时间

再次提问

在业务场景中,我们很多需求都是要使用事件时间来作为依据的, 我想要按照时间事件来完成这个需求, 并且将迟到的数据也纳入到计算结果中, 应该如何解决?

问题总结

使用EventTime所要面对的问题

  1. 数据延迟
  1. 网络延迟
  2. 性能延迟
  3. ...
  1. 乱序
  1. 数据源数据相对于时间本身就无序
  2. 分布式场景下本身有序的数据也难以保持被读取时有序
  3. ...

分享者回答

如果使用类似于hive、doris、clinkhous之类的olap数据仓库, 我们可以等待到一个合适的时间(数据完全到达之后)再进行排序/计算, 而在flink中,提供了一个叫做watermark的机制来完成这个需求,应对这些问题。

watermark

背景

在流式数据中,虽然数据本身是按照时间顺序向下游推送的, 但在网络环境、分布式等因素下, 导致到达时间窗中的顺序并不是按照原本发送的顺序。有时数据发送的本身就不是按照严格的事件时间进行推送的

什么是watermark

以前我对watermark了解不够深的时候,我以为watermark是flink的时间等待机制, 后来我才知道,watermark是flink的事件时间推进机制,事件时间等待机制,只是他的一部分。

  1. watermark是解决数据乱序到达的,也可以理解为解决数据延迟到达,
  2. watermark在解决上述问题时,要结合flink的window(时间窗)机制,
  3. flink中的window(时间窗)是由watermark来触发的,这就意味着窗口触发时,数据中timeStamp<=watermark的,均已到达时间窗

watermark 事件时间推进机制

  • 特点
  1. watermark本身也会是上游向下游发送数据时,附带的一个记录
  2. watermark必须是单调递增的,保证任务的时间一直在往前推进,不可后退
  3. watermark由数据中的时间戳来更新

watermark的生命周期

env.getConfig().setAutoWatermarkInterval(200)//(默认值200ms)
如果要禁用watermark机制,可以通过设置watermark生成频率来实现
env.getConfig().setAutoWatermarkInterval(0)//(默认值200ms)
AssignerWithPeriodicWatermarks  (已过期)周期性生成watermark
AssignerWithPunctuatedWatermarks(已过期) 按照指定标记性事件生成watermark

watermark的更新机制

当flink开启watermark时,在所有的并发中的数据首先经过watermark管理,

source算子每200ms从数据中获取一次时间戳,并更新自己的maxTimeStamp,并广播到下游

下游的算子拿到数据时,并不会根据数中时间进行更新watermark,而是根据上游发送过来的数据中携带的maxTimeStamp来更新自身watermark的值

640.png

而是根据watermark广播到下游的maxTimeStamp值进行触发和结束,计算,

下游每200ms对比各个并发发送的maxTimeStamp,并根据最小值,刷新自身的maxTimeStamp并广播到下游

当上游有多个watermark发来的maxTimeStamp值,下游更新自身maxTimeStamp时取最小值 以最小值为基准,较大值到达时可以分发到他应该到的时间分桶中, 如果收到超出时间窗之外的未来数据,会创建此数据应有的时间窗,并开始缓存,时间窗(桶)的数量时没有限制的 如果以最大值为基准,会导致时间窗提前结束,maxTimeStamp较小的被抛弃掉

640.png

问题/需求解决

watermark是如何解决我们讨论之处提出的问题的呢? 我们也是时候上代码了!!!

完整的watermark使用代码

WatermarkStrategy<Bean> beanWatermarkStrategy = WatermarkStrategy
               .forGenerator(new WatermarkGeneratorSupplier<Bean>() {
                   @Override
                   public WatermarkGenerator<Bean> createWatermarkGenerator(Context context) {
                      return new WatermarkGenerator<Bean>() {
                          /** 最大时间戳. */
                           private long maxTimestamp;
                          /** 水印生成的最大无序度 */
                           private final long outOfOrdernessMillis = 0;
                           //watermark比较器
                           @Override
                           public void onEvent(Bean event, long eventTimestamp, WatermarkOutput output) {
                               maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                           }
                           //watermark生成和发送
                           @Override
                           public void onPeriodicEmit(WatermarkOutput output) {
                               output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
                           }
                       };
                   }
               })
//                .noWatermarks()  //创建完全不生成水印的水印策略。这在执行纯处理基于时间的流处理的场景中可能很有用。
//                .forMonotonousTimestamps()    //紧跟最大时间时间,完全不容忍乱序
//                .<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允许乱序的生成策略   最大时间时间-容错时间
               .withIdleness(Duration.ofSeconds(5))    //当某一并发迟迟没有数据进来时,多长时间发送一次watermark值
               .withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
                   @Override
                   public long extractTimestamp(Bean element, long recordTimestamp) {
                       return element.getEventTime();
                   }
               })//watermark提取策略(从数据中)

小延迟 - watermark推后机制 - BoundedOutOfOrderness策略

  • BoundedOutOfOrderness策略

用wartermark容错,减慢时间的推进,在迟到数据到达时,让下游认为他还没有迟到

说句人话,实际上就是用已经获取到的时间戳-允许迟到的时间=watermark值

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
199 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
99 5
|
3月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
95 0
|
25天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
396 2
探索Flink动态CEP:杭州银行的实战案例
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
334 2
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
549 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
3月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
52 0
|
3月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
105 0
|
5月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
100 1
|
5月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
107 1