记一次Flink 消费Kafka数据积压排查解决

简介: 记一次Flink 消费Kafka数据积压排查解决

1:背景

接手了一个问题排查的工作,有个Flink任务每天不定时会出现数据积压,无论是白天还是数据量很少的夜里,且积压的数据量会越来越多,得不到缓解,只能每日在积压告警后重启,重启之后消费能力一点毛病没有,积压迅速缓解,然而,问题会周而复始的出现,无论是周末还是节假日,忍不了。

2:现象

1:当积压时,最明显的是kafka积压不断升高

60a6bcefe26f4b118e50f46e4d0afd1d.png

2. Flink ProcessFunction(主要处理逻辑)中多个代码块处理时间变长


       为了定位问题,在processFunction多个代码块加了处理时间的计算,结果发现,无论是简单的json处理部分还是与外部Redis,Mysql交互部分,都会有执行时间久的记录,另外这个任务Sink的地方是神策接口,接口设置的有超时时间,所以也会看到很多Sink TimeOut的记录

60a6bcefe26f4b118e50f46e4d0afd1d.png

3:解决历程

1. 调整读取kafka消息数量

ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"300"

2.调整Sink端接口的超时时间等

3. 调整任务资源

上面三个调整,前面两个没啥用,后面这个任务由原来一天一次不定时积压变成了2-3天积压

问题还是要解决的,彻底解决的

其实一直没找到真正的原因

后面发现了,当任务积压时,TaskManager所在的机器CPU会突然升高,且一直持续,直到任务重启

60a6bcefe26f4b118e50f46e4d0afd1d.png

好吧,这里就是最终的决赛场了

4:问题解决

 

接下来就是分析CPU升高的原因,可以参考另外一篇博客线上java程序CPU占用过高问题排查_vioao的博客-CSDN博客_java程序cpu占用过高

去查看任务pid的一些相关信息,这里放两张图

60a6bcefe26f4b118e50f46e4d0afd1d.png

75f0e2306cfe4b549332ab598e15c984.png

这么频繁的FGC,那问题就很明显了,FGC的时候,CPU升高,对应代码里CPU片段走到哪就停到哪,然后这一块的执行时间就变成,有的是5s,有的是15s


最后就是分析频繁FGC的原因,从上面第一张图也大概能看出来,ResultSetImpl是执行Mysql查询结果返回的对象类型,为了问题的排查,还是借助一下专业的工具


把内存文件dump下来分析一下

60a6bcefe26f4b118e50f46e4d0afd1d.png

排查代码,发现与外部Mysql交互的时候,前面开发的同学大意,没做close,好吧,加上吧

finally {
      if (rs != null) {
        try {
          rs.close();
        } catch (SQLException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      if (prepStatement != null) {
        try {
          prepStatement.close();
        } catch (SQLException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }

太不容易了,这么一个小问题折磨了前面同事这么久,到此结束。






相关文章
|
4月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
757 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
324 0
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
408 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
952 43
|
6月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2623 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
7月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
481 1
京东零售基于Flink的推荐系统智能数据体系
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
270 11
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
621 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
SQL 流计算 消息中间件
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
《Flink SQL 1.9.0 技术内幕和最佳实践》,许多小伙伴对演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL