Flink - checkpoint Failure reason: Not all required tasks are currently running

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 程序增加 readFile 生成文件流后,最初运行期间 CheckPoint 存储没有问题,待文件流 Finished 后 CheckPoint 存储报错: checkpoint Failure reason: Not all required tasks are currently running,下面分析并解决下。

一.引言

Flink 程序增加 readFile 生成文件流后,最初运行期间 CheckPoint 存储没有问题,待文件流 Finished 后 CheckPoint 存储报错: checkpoint Failure reason: Not all required tasks are currently running,下面分析并解决下。

image.gif编辑

二.错误分析与解决

1.问题排查

Flink 场景下实现多流 uinon 并存储 ValueState,除了实时流的内容外,还需要将一个固定文件的内容进行 union 处理,所以出现如下 Overview:

image.gif编辑

最左边的 Source: Custom File Source 就是单独增加的文件流,由于是固定文件且逻辑简单,所以执行开始时 Busy 打的比较高,在此期间查看 RockerDB 指定 path 下存储的 chk 是没有问题的。

image.gif编辑

但是由于固定文件内数据量有限,处理完毕后,该 File Source 由 Running 切换至 Finished 状态:

image.gif编辑

此时由于有 Finshed 的节点,所以会空闲一些 Task,这时候在看 checkpoint 存储的报错原因:

Not all required tasks are currently running

问题应该就出在这里了,由于 File Source 执行完毕后状态转为 Finished,从而导致有 task 状态转换,而存储 checkpoint 需要所有 task 处于 Running 状态,从而导致存储 checkpoint 报错,不过这里存储出问题,不会影响任务的执行。

image.gif编辑 出错后 checkpoint 大小变为 0。

2.问题分析

问题:SourceFile 执行完毕由 Running 切换至 Finished 导致 checkpoint 执行失败

解决:只需让 SourceFile 保持 Running 状态即可

image.gif编辑

readFile 支持额外参数 watchType 与 interval,博主之前在在Flink - DataStream 获取数据总结一文中对下述参数进行了分析,这里直接搬运:

image.gif编辑

WatchType 分为 PROCESS_CONTINUOUSLY 和 PROCESS_ONCE。

PROCESS_CONTINUOUSLY : 根据 interval 间隔扫描文件检测其状态,当文件被修改后会重新处理文件内容,这将打破 exactly-once 语义。

PROCESS_ONCE:示例中默认使用该模式,该模式下只读取一遍随后任务结束。关闭 source 会导致在那之后不会再有检查点。

3.问题解决

通过前两步的分析,解决任务的办法很简单了,只需在 readFile 时修改 WatchType 为 PROCESS_CONTINUES 即可保证 Source File 处于 Running 状态。

.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)

image.gif

修改后 ↓↓↓

.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), 
path, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 
86400L)

image.gif

修改后再次提交任务:

image.gif编辑

PROCESS_ONCE 执行时 Source File 5min 左右会进入 Finished 状态,修改后 30min 仍然处于 Running 状态。

三.新的问题

1.问题排查

image.gif编辑

上述修改提交后,程序正常运行一段时间后,整个执行视图丢失 WaterMark,程序不再 sendRecord 并卡住。可以看到 Co-Process-Broadcast-keyed 流接到 7770550 条数据但一条数据也没有发送,查看上面 window 执行窗口也没有 Watermark 显示,所以数据不发送原因应该是 WaterMark 不更新从而导致窗口不触发所以囤积数据。

image.gif编辑

再查看上述界面发现 Source File 的 RecordsReceive 和 RecordsSent 均为0,程序最开始处理文件是有内容的且这两个参数也有值,这里没值应该是重新扫描文件导致没有数据从而产生空流,进而导致 WaterMark 消失。没有 WaterMark 导致任务停滞可以参考:Flink - 新增 BroadcastStream 无 watermark 导致数据流异常

2.问题分析

.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), 
path, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 
86400L)

image.gif

由于我的文件为固定文件即当天不会更新,所以我设置 Watch 检测文件的间隔为 86400,正好是一天的秒数,这里我怀疑文件重新扫描的 interval 参数我配置有问题,遂查看源码:

image.gif编辑

这里 sleep 采用 Thread.sleep 执行,其单位为 mills,所以这里如果设置一天应该改为 86400000 才对。

3.问题解决 (👍)

这里只需把 readFile 的 interval 参数调大即可:

.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), 
path, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 
86400000L)

image.gif

当然,如果你的文件不是按天更新或者很长一段时间才更新,这个 interval 参数可以设置的很大,从而将对应 Thread 挂起不影响整体任务。调大该参数后,任务不再中途丢失 WaterMark 卡住,正常执行。

image.gif编辑

四.问题总结

此次问题的发生还是对基本的 API 掌握不熟悉导致,从而会出现漏掉 WatchType 参数,写错 interval 参数单位的情况,还是要多多学习 API,查看源码与问题分析与记录。下面是源码基于 readFile 参数的解释,有需要的同学也可以到官方 API 系统学习一下。

image.gif编辑

虽然最终解决方案只是增加两个参数,但是分析与排查的过程也很重要。🤓

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
125 1
|
9月前
|
存储 监控 算法
Flink 四大基石之 Checkpoint 使用详解
Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。
1548 20
|
消息中间件 监控 Java
实时计算 Flink版产品使用问题之该如何解决checkpoint频繁失败
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
179 1
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
116 1
|
存储 缓存 数据处理
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
136 1
|
存储 分布式计算 算法
Flink四大基石——4.Checkpoint容错机制
Flink四大基石——4.Checkpoint容错机制
301 1
|
Java 关系型数据库 数据库
实时计算 Flink版操作报错合集之拉取全量数据时,如何解决Checkpoint失败并且报错为 "java.lang.OutOfMemoryError: Java heap space"
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 消息中间件 对象存储
实时计算 Flink版产品使用问题之checkpoint从几百毫秒突然变成10分钟失败,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。