Flink - 新增 BroadcastStream 无 watermark 导致数据流异常

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 任务新增 BroadcastStream 无 watermark 导致数据流异常,修复问题并熟悉单流,双流 Watermark 机制。

 一.问题分析

原始程序使用 EventTime,JobGraph 为 Source + KeyBy + ProcessFunction + Window + Sink 形式,其中 ProcessFunction 内设置了 ValueState 与 onTimer 的机制,由于需要定时更新一些任务需要的实时变量,故引入 BroadcastStream 实现实时变量的不定时更新,经过修改后的 JobGraph 为 Source + keyd-Broadcast-Process + window + Sink,如图所示:

image.gif编辑

任务执行后发现第三步 Co-Process-Broadcast-keyed Stage 部分 Records-Received 正常,但是 Bytes Sent 消失了,即下游没有 (onTimer) 的数据写入到 window ,导致所有数据卡在第三步,window 无法触发:

image.gif编辑

再看下游算子发现 WaterMark 均为空:

image.gif编辑

这时才想起来,自己的任务设定为 EventTime,原始 Source 设置了时间戳和 watermark 但新增的 BroadcastStream 未设置时间戳和 watermark 遂导致另外一个 Source 不更新 watermark,导致整个任务的 watermark 无法推进,从而导致任务卡死。所以解决方法就是给 BroadcastStream 实现 assignTimestampsAndWatermarks 方法设置时间戳和 watermark 即可。

二.问题修复

broadcastStream 主题框架如下,如果需要完整的示例可以参考: DataStream Broadcast 示例详解

DataStream<T> output = dataStream
                 .connect(BroadcastStream)
                 .process(
                     // KeyedBroadcastProcessFunction 中的类型参数表示:
                     //   1. key stream 中的 key 类型
                     //   2. 非广播流中的元素类型
                     //   3. 广播流中的元素类型
                     //   4. 结果的类型,在这里是 string
                     new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() {
                         // 模式匹配逻辑
                     }
                 );

image.gif

下面详细说下 BroadcastStream,任务失败之前我的 BroadcastStream 是这样定义的:

MapStateDescriptor<String, T> descriptor = new MapStateDescriptor<>("T", String.class, T.class);
BroadcastStream<T> contextStream = env
.addSource(new SelfDefinedSource())
.setParallelism(1)
.broadcast(descriptor);

image.gif

这里通过自定义的 SelfDefinedSource 函数定期读取更新的在线变量,并后续通过 ctx.collect() 生产到 BroadcastStream 中,由于未对该数据流设置 watermark 导致任务失败,下面继承 BoundedOutOfOrdernessTimestampExtractor 类为该数据流设置 watermark:

public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> {
        public MultiSortContextTSExtrator() {
            super(Time.seconds(MAX_SEND_EVENT_DELAY));
        }
        @Override
        public long extractTimestamp(T t) {
            return System.currentTimeMillis();
        }
    }

image.gif

这里直接设置为 System.currentTimeMillis,如果需要从 T 中获取时间戳也可以生成 T 时绑定其 EventTimeStamp,随后为源数据通过 assign 方法增加时间戳:

BroadcastStream<T> contextStream = env
.addSource(new SelfDefinedSource())
.assignTimestampsAndWatermarks(newContextTSExtrator())
.setParallelism(1)
.broadcast(descriptor);

image.gif

增加完时间戳后,任务正常执行,可以看到流程图上 keyd-Broadcast-Process 部分和 window 部分也都有了实时更新的 watermark,任务正常进行:

image.gif编辑

三.优化

processFunction 中处理后的数据设置时间=t 的过期时间,即 eventTime + t 为 onTimer 的时机,BroadCastStream 则时每隔 X 分钟读取线上最新变量并更新最后通过 context.collect(T) 发出,X 分钟通过 Times.sleep() 实现。理想情况每隔数据的间隔为 t ,但根据 stage 和日志中的日志发现每隔元素处理到 sink 的间隔为 t + X,即延时处理的时间与定时读取实时变量的间隔一致。

上述新增的 waterMark 为 System.currentimeMilles,但是因为每隔 X 分钟执行一次,所以 broadcastStream 发出的 waterMark 总是落后的,基于这个事实继续排查问题。

1.单流 Watermark 机制

首先看一下每个数据源的 watermark 更新机制,这个可以在 AssignerWithPeriodicWatermarks 类中看到,默认不需要我们复写:

public final Watermark getCurrentWatermark() {
        long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
        if (potentialWM >= this.lastEmittedWatermark) {
            this.lastEmittedWatermark = potentialWM;
        }
        return new Watermark(this.lastEmittedWatermark);

image.gif

将当前元素的时间戳减去最大的迟到容忍时间获取 poetntialWaterMark,如果大于上一次发出的 waterMark,则重新赋值,即取 currentWaterMark = Max(poentitialWaterMark,lastEmittedWaterMark),该值代表 Flink 认为小于该 currentWaterMark 值的数据都到了。

2.多流 WaterMark 机制

co-broadcast-keyed-stream 属于双流任务,每个流都包含一个 waterMark,两个流同时发出 waterMar,Flink 底层通过继承 TwoInputStreamOperator 接口完成对双流 element 和 waterMark 的处理:

public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    void processElement1(StreamRecord<IN1> var1) throws Exception;
    void processElement2(StreamRecord<IN2> var1) throws Exception;
    void processWatermark1(Watermark var1) throws Exception;
    void processWatermark2(Watermark var1) throws Exception;
    void processLatencyMarker1(LatencyMarker var1) throws Exception;
    void processLatencyMarker2(LatencyMarker var1) throws Exception;
}

image.gif

基于我们上面遇到的延时问题,这里需要重点关注 processWatermark 的逻辑,processWatermark 的逻辑放在了 AbstractStreamOperator 抽象类下:

public void processWatermark(Watermark mark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(mark);
        }
        this.output.emitWatermark(mark);
    }
    public void processWatermark1(Watermark mark) throws Exception {
        this.input1Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }
    public void processWatermark2(Watermark mark) throws Exception {
        this.input2Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

image.gif

针对 Stream1 和 Stream2 分别执行 processWatermark1 和 processWatermark2,其内部通过 Min 方法选取两个流中最小的一个作为新的最小值 WaterMark,然后再和 lastEmittedWatermark 进行比较再推进整个任务的 watermark 流动。所以针对上述等待时间 X 过长的问题,我们需要提高 BroadcastStream 的 watermark 生产效率,即缩短产生 T 的间隔。

3.缩短 Watermark 发送间隔

将间隔 X 缩短至非常小的间隔 x,需要给生成的 T 增加一个 valid 属性,给 Source 函数增加一个 epoch 控制,只有当 epoch * x = X 的时候,读取线上更新并发送一个 valid = true 的 T,其余时间发送 valid = false 的 T(null),随后在 ProcessBroadcastValue 时判断 valid 状态,只有为 true 时才更新 Broadcast 的 value 到各个 task。

override def run(sourceContext: SourceFunction.SourceContext[T]): Unit = {
    Client client = new Socket(host, port);
    while (isRunning) {
      val t = if (epoch % X == 0) {
        ... 读取线上变量
        re.setValid() // 生效
        re
      } else {
        val re = new T(null)
        re
      }
      sourceContext.collect(t)
      TimeUnit.SECONDS.sleep(x)
      epoch += 1
    }
  }

image.gif

例如之前以 X=300s 的间隔生成 T,将间隔缩短为 x=5s,则推算出 epoch = 300/5 = 60,所以 if 逻辑内为 epoch % 60 == 0,其余时间设计以此类推。

4.更优的方案

上述方案把间隔从 X 优化至 x,但是任务执行的数据源其实是另一个 DataStream,BroadcastStream 只负责定时更新 DataStream 需要的变量,所以最好的方案是任务 watermark 流动完全取决于 DataStream。这时再回看双流 processWatermark 函数:

public void processWatermarkX(Watermark mark) throws Exception {
        this.input2Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

image.gif

newMin 是通过取双流最小,然后再以最小的为基准,退化为单流更新 WaterMark 更新逻辑,只取决于 DataStream ,那我们把 Broadcast 的 waterMark 设定为 Max 不就完了,这样每次 newMin 的值都取决于 DataStream,从而保证 watrtMark 流动完全取决于 DataStream,这样最后很短的间隔 x 也没有了,任务执行的延迟完全为 onTimer 设置的 valueState 的过期时间。

修改 BroadcastStream 设定时间戳和 watermark 的函数:

直接设置为 Watermark 类自带的 MAX_WARTERMARK.getTimeStamp(),自己设置一个非常大的时间戳也可以。

public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> {
        public MultiSortContextTSExtrator() {
            super(Time.seconds(MAX_SEND_EVENT_DELAY));
        }
        @Override
        public long extractTimestamp(T t) {
            return Watermark.MAX_WATERMARK.getTimestamp();
        }
    }

image.gif

其值为:

static final Watermark MAX_WATERMARK = new Watermark(9223372036854775807L);

image.gif

四.总结

通过增加和修改 watermark 逻辑,增加 broadcastStream 的双流任务终于正常执行,且数据流动完全取决于原始数据流,非常的完美。出现这样的问题还是对 Flink 的 watermark 机制不是很清晰,后续还需要继续加深对 watermark 的理解。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
76 0
|
2月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
92 0
|
2月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
36 0
|
4月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
4月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
323 2
|
4月前
|
消息中间件 大数据 Kafka
"Apache Flink:重塑大数据实时处理新纪元,卓越性能与灵活性的实时数据流处理王者"
【8月更文挑战第10天】Apache Flink以卓越性能和高度灵活性在大数据实时处理领域崭露头角。它打破批处理与流处理的传统界限,采用统一模型处理有界和无界数据流,提升了开发效率和系统灵活性。Flink支持毫秒级低延迟处理,通过时间窗口、状态管理和自动并行化等关键技术确保高性能与可靠性。示例代码展示了如何使用Flink从Kafka读取实时数据并进行处理,简明扼要地呈现了Flink的强大能力。随着技术进步,Flink将在更多场景中提供高效可靠的解决方案,持续引领大数据实时处理的发展趋势。
104 7
|
4月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
79 0
|
4月前
|
消息中间件 Java 数据处理
揭秘Apache Flink的Exactly-Once神技:如何在数据流海中确保每条信息精准无误,不丢不重?
【8月更文挑战第26天】Apache Flink 是一款先进的流处理框架,其核心特性 Exactly-Once 语义保证了数据处理的精准无误。尤其在金融及电商等高要求场景下,该特性极为关键。本文深入解析 Flink 如何实现 Exactly-Once 语义:通过状态管理确保中间结果可靠存储;利用一致的检查点机制定期保存状态快照;以及通过精确的状态恢复避免数据重复处理或丢失。最后,提供一个 Java 示例,展示如何计算用户访问次数,并确保 Exactly-Once 语义的应用。
98 0
|
4月前
|
监控 Apache 流计算
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
56 0
|
4月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
47 0