Flink / Kafka - Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy 排查与修复 ———————————————— 版权声明:本文为CSDN博主「BIT_666」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/BIT_666/article/details/125419738

简介: 使用 Flink - Kafka 接数据 Source 时程序报错:org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy,任务每次启动后持续10min左右,然后 RUNNING -> FAILED,如此重启失败了多次。

一.引言

使用 Flink - Kafka 接数据 Source 时程序报错:

org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy

image.gif

任务每次启动后持续10min左右,然后 RUNNING -> FAILED,如此重启失败了多次。

二.问题现象

1.任务 URL 界面

对应任务界面可以看到有一个 Source 的 3 个 Task 在任务启动的时间内一直处于 INITIALIZING 状态,直到任务结束。

image.gif编辑

2.yarn 界面

上述报错情况下 yarn 界面如下,任务重启后大约 7min 失败

image.gif编辑

三.问题分析与解决

1.Source 持续 INITIALIZING 与 周期性 Failed

查看异常栈日志:

image.gif编辑

A.持续 INITIALIZING

从下往上看,显示任务被远程的 taskManager close,结合上面 Source 端有 3 个 task 一直处于 INITIALIZING 状态,大概率是因为某 broker crash ,从而导致无法 ping 通该节点,导致超时无法启动,当超时时间大于 flink 规定的心跳周期,任务 INITIALIZING -> FAILED,所以出现URL 的持续 INITIALIZING。

B.周期性 Failed

再往上是:Flink Recovery is suppressed by FixedDelayRestartBackoffTime,其中 maxNumberRestartAttempt 为 3,此时我们可以到提交客户端的 flink-conf.yaml 查看对应配置:

restart-strategy: failure-rate
restart-strategy.failure-rate.failure-rate-interval: 2 min

image.gif

可以看到重启策略为 failure-rate 即周期性的重启,其中周期为 2min,结合上面 maxNumberRestartAttempt = 3,这也找到了为什么程序 7min 左右循环退出的原因了,程序有1 min 左右的申请资源和初始化启动的时间,运行期间出现故障,按照 failure-rate 策略重启共计耗时 3 x 2min = 6min,3次尝试后任务仍然无法运行( taskManger 持续 ping 不通导致故障) 达到最大重启次数,任务退出。

filure-rate 重启策略如下:

重启策略在出现故障后重新启动作业,但当超过故障率(每个时间间隔的故障数)时,作业最终会失败。在两次连续的重启尝试之间,重启策略会等待固定的时间。

image.gif编辑

也可以在代码中配置:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per unit
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
))

image.gif

查看更多的重启策略信息可以查看官方 API:Flink 重启策略

2.问题解决

上面分析了很多,又是看日志,又是看官方 API,问题解决的方法其实很简单:

A.获取持续处于 INITIALIZING 的 task 对应的 Broker 地址

找到对应 source 对应 kafka 的 properties 配置中的 bootstrap.servers 参数即可,一般形式为 "broker1:port,broker2:port,broker3:port,..."

B.Ping 各个 broker

ping $broker

image.gif

分别 ping 对应 broker 对应机器,如果该 broker 正常会快速显示下述类似信息:

image.gif编辑

如果该 broker 异常,则 ping 后 shell 界面处于 _ 的等待状态,此时说明对应 broker 连接异常。

C.将异常 broker 从  bootstrap.servers 参数中剔除重启

一般情况下 kafka 都会有机器设置的冗余且实现互备,所以正常情况下去掉单台 broker 重启后任务不受影响

3.其他系统问题

Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy 该报错主要是告知对应任务故障重启,主要异常分析还是要依靠最开始的异常栈确定异常源头,且大部分是与 flink 相关的系统参数,配置有关,例如本例就是 flink 的 source 端无法完全启动导致。除了上述 broker crash 掉点导致的 connection refused 之外

A.时间语义不匹配

还可能因为任务的时间语义与env设置不匹配导致任务重启并最终故障:

// 事件时间 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 处理时间 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

image.gif

B.TimeWindowAll 并行度

TimeWindowAll 的并行度默认为1,不可以修改,如果代码内 setParallelism > 1 则会在任务 submit 时直接报错

C.non serializable field 序列化

如果在 Source 函数或者其他需要序列化的类内初始化了不可序列化的变量会在任务启动时报错,需要使该变量支持序列化或采用其他方式解决

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
消息中间件 存储 传感器
323 0
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
269 11
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
621 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1141 0
|
12月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
937 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
12月前
|
消息中间件 Kafka 流计算
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
1272 28
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
1017 61
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
857 0
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
995 0