Flink作业问题分析和调优实践

简介: 本文主要分享 Flink 的 CheckPoint 机制、反压机制及 Flink 的内存模型。对这3部分内容的熟悉是调优的前提,文章主要从以下几个部分分享:原理剖析、性能定位、经典场景调优、内存调优。

作者:李康

摘要:本文主要分享 Flink 的 CheckPoint 机制、反压机制及 Flink 的内存模型。对这3部分内容的熟悉是调优的前提,文章主要从以下几个部分分享:

  1. 原理剖析
  2. 性能定位
  3. 经典场景调优
  4. 内存调优

Checkpoint 机制

1.什么是 checkpoint

简单地说就是 Flink 为了达到容错和 exactly-once 语义的功能,定期把 state 持久化下来,而这一持久化的过程就叫做 checkpoint ,它是 Flink Job 在某一时刻全局状态的快照。

当我们要对分布式系统实现一个全局状态保留的功能时,传统方案会引入一个统一时钟,通过分布式系统中的 master 节点广播出去给每一个 slaves 节点,当节点接收到这个统一时钟时,它们就记录下自己当前的状态即可。

1 unified-clock.png

但是统一时钟的方式也存在一定的问题,某一个 node 进行的 GC 时间比较长,或者 master 与 slaves 的网络在当时存在波动而造成时钟的发送延迟或者发送失败,都会造成此 slave 和其它的机器出现数据不一致而最终导致脑裂的情况。如果我们想要解决这个问题,就需要对 master 和 slaves 做一个 HA(High Availability)。但是,一个系统越是复杂,就越不稳定且维护成本越高。

Flink 是将 checkpoint 都放进了一个名为 Barrier 的流。

2 flink-barrier.png

上图中就是一个 Barrier 的例子,从上游的第一个 Task 到下游的最后一个 Task,每次当 Task 经过图中蓝色的栅栏时,就会触发 save snapshot(快照)的功能。我们用一个例子来简单说明。

2.实例分析

3 ETL.png

这是一个简单的 ETL 过程,首先我们把数据从 Kafka 中拿过来进行一个 trans 的转换操作,然后再发送到一个下游的 Kafka

此时这个例子中没有进行 chaining 的调优。所以此时采用的是 forward strategy ,也就是 “一个 task 的输出只发送给一个 task 作为输入”,这样的方式,这样做也有一个好处就是如果两个 task 都在一个 JVM 中的话,那么就可以避免不必要的网络开销

设置 Parallism 为 2,此时的 DAG 图如下:

4 JobGraph.png

■ CK的分析过程

5 checkpoint的流程.png

每一个 Flink 作业都会有一个 JobManager ,JobManager 里面又会有一个 checkpoint coordinator 来管理整个 checkpoint 的过程,我们可以设置一个时间间隔让 checkpoint coordinator 将一个 checkpoint 的事件发送给每一个 Container 中的 source task,也就是第一个任务(对应并行图中的 task1,task2)。

当某个 Source 算子收到一个 Barrier 时,它会暂停自身的数据处理,然后将自己的当前 state 制作成 snapshot(快照),并保存到指定的持久化存储中,最后向 CheckpointCoordinator 异步发送一个 ack(Acknowledge character --- 确认字符),同时向自身所有下游算子广播该 Barrier 后恢复自身的数据处理。

每个算子按照上面不断制作 snapshot 并向下游广播,直到最后 Barrier 传递到 sink 算子,此时快照便制作完成。这时候需要注意的是,上游算子可能是多个数据源,对应多个 Barrier 需要全部到齐才一次性触发 checkpoint ,所以在遇到 checkpoint 时间较长的情况时,有可能是因为数据对齐需要耗费的时间比较长所造成的。

■ Snapshot & Recover

5 snapshot&recover1.png

如图,这是我们的Container容器初始化的阶段,e1 和 e2 是刚从 Kafka 消费过来的数据,与此同时,CheckpointCoordinator 也往它发送了 Barrier。

6 snapshot&recover2.png

此时 Task1 完成了它的 checkpoint 过程,效果就是记录下 offset 为2(e1,e2),然后把 Barrier 往下游的算子广播,Task3 的输入为 Task1 的输出,现在假设我的这个程序的功能是统计数据的条数,此时 Task3 的 checkpoint 效果就是就记录数据数为2(因为从 Task1 过来的数据就是 e1 和 e2 两条),之后再将 Barrier 往下广播,当此 Barrier 传递到 sink 算子,snapshot 就算是制作完成了。

7 snapshot&recover3.png

此时 source 中还会源源不断的产生数据,并产生新的 checkpoint ,但是此时如果 Container 宕机重启就需要进行数据的恢复了。刚刚完成的 checkpoint 中 offset为2,count为2,那我们就按照这个 state 进行恢复。此时 Task1 会从 e3 开始消费,这就是 Recover 操作。

8 snapshot&recover4.png

■ checkpoint 的注意事项

下面列举的3个注意要点都会影响到系统的吞吐,在实际开发过程中需要注意:

9 checkpoint注意事项.png

3.背压的产生及 Flink 的反压处理

在分布式系统中经常会出现多个 Task 多个 JVM 之间可能需要做数据的交换,我们使用生产者和消费者来说明这个事情。

10 背压-OOM.png

假设我现在的 Producer 是使用了无界 buffer 来进行存储,当我们的生产者生产速度远大于消费者消费的速度时,生产端的数据会因为消费端的消费能力低下而导致数据积压,最终导致 OOM 的产生。

11 背压-有界buffer.png

而就算使用了有界 buffer,同样消费者端的消费能力低下,当 buffer 被积满时生产者就会停止生产,这样还不能完全地解决我们的问题,所以就需要根据不同的情况进行调整。

Flink 也是通过有界 buffer 来进行不同 TaskManager 的数据交换。而且做法分为了静态控流和动态控流两种方式。

12 反压-静态控流.png

简单来说就是当生产者比消费者的 TPS 多时,我们采用溢写的方式,使用 batch 来封装好我们的数据,然后分批发送出去,每次发送完成后再 sleep 一段时间,这个时间的计算方式是 left(剩余的数据)/ tps,但是这个做法是很难去预估系统的情况的。

13 反压-动态控流.png

Flink 1.5 之前的流控是基于 TCP 的滑动窗口实现的,在之前的课程中已经有提到过了。而 Flink 在1.5之后已经弃用了该机制,所以这里不展开说明。在此网络模型中,数据生成节点只能通过检查当前的 channel 是否可写来决定自己是否要向消费端发送数据,它对下游数据消费端的真实容量情况一概不知。这就导致,当生成节点发现 channel 已经不可写的时候,有可能下游消费节点已经积压了很多数据。

Credit-Based 我们用下面的数据交换的例子说明:

Flink 的数据交换大致分为三种,一种是同一个 Task 的数据交换,另一种是 不同 Task 同 JVM 下的数据交换。第三种就是不同 Task 且不同 JVM 之间的交换。

14 Flink数据交换.png

同一个 Task 的数据交换就是我们刚刚提到的 forward strategy 方式,主要就是避免了序列化和网络的开销。

15 第二种数据交换.png

第二种数据交换的方式就是数据会先通过一个 record Writer ,数据在里面进行序列化之后再传递给 Result Partition ,之后数据会通过 local channel 传递给另外一个 Task 的 Input Gate 里面,再进行反序列化,推送给 Record Reader 之后进行操作。

16 第三种数据交换.png

因为第三种数据交换涉及到了不同的 JVM,所以会有一定的网络开销,和第二种的区别就在于它先推给了 Netty ,通过netty把数据推送到远程端的 Task 上。

■ Credit-Based

17 credit based.png

此时我们可以看到 event1 已经连带一个 backlog = 1 推送给了 TaskB,backlog 的作用其实只是为了让消费端感知到我们生产端的情况

18 credit based2.png

此时 event1 被 TaskB 接收后,TaskB会返回一个 ack 给 TaskA,同时返回一个credit = 3,这个是告知 TaskA 它还能接收多少条数据,Flink 就是通过这种互相告知的方式,来让生产者和消费者都能感知到对方的状态。

19 credit based3.png

此时经过一段时间之后,TaskB中的有界 buffer 已经满了,此时 TaskB回复 credit = 0 给 TaskA,此时 channel 通道将会停止工作,TaskA 不再将数据发往 TaskB。

20 credit based 4.png
21 credit based5.png

此时再经过一段时间,TaskA 中的有界 Buffer 也已经出现了数据积压,所以我们平时遇到的吞吐下降,处理延迟的问题,就是因为此时整个系统相当于一个停滞的状态,如图二示,所有的过程都被打上 “X”,表示这些过程都已经停止工作。

22 Flink内存模型.png

JVM 是一个非常复杂的系统,当其内存不足时会造成 OOM ,导致系统的崩溃。Flink 在拿到我们分配的内存之后会先分配一个 cutoff 预留内存,保证系统的安全性。Netword buffers 其实就是对应我们刚刚一直提到的有界 buffer,momery manager 是一个内存池,这部分的内存可以设置为堆内或者堆外的内存,当然在流式作业中我们一般设置其为堆外内存,而 Free 部分就是提供给用户使用的内存块。

现在我们假设分配给此 TaskManager 的内存是 8g。

23 Flink内存计算.png

  1. 首先是要砍掉 cutoff 的部分,默认是0.25,所以我们的可用内存就是 8gx0.75
  2. network buffers 占用可用内存的 0.1 ,所以是 6144x0.1
  3. 堆内/堆外内存为可用内存减去 network buffers 的部分,再乘以 0.8
  4. 给到用户使用的内存就是堆内存剩下的 0.2 那部分

其实真实情况是 Flink 是先知道了 heap 内存的大小然后逆推出其它内存的大小。

Flink 作业的问题定位

1.问题定位口诀

一压二查三指标,延迟吞吐是核心
时刻关注资源量 , 排查首先看GC。”

一压是指背压,遇到问题先看背压的情况,二查就是指 checkpoint ,对齐数据的时间是否很长,state 是否很大,这些都是和系统吞吐密切相关的,三指标就是指 Flink UI 那块的一些展示,我们的主要关注点其实就是延迟和吞吐,系统资源,还有就是 GC logs。

  • 看反压:通常最后一个被压高的 subTask 的下游就是 job 的瓶颈之一。
  • 看 Checkpoint 时长:Checkpoint 时长能在一定程度影响 job 的整体吞吐。
  • 看核心指标:指标是对一个任务性能精准判断的依据,延迟指标和吞吐则是其中最为关键的指标。
  • 资源的使用率:提高资源的利用率是最终的目的。

■ 常见的性能问题

26 Flink常见性能问题.png

简单解释一下:

  1. 在关注背压的时候大家往往忽略了数据的序列化和反序列化过程所造成的性能问题。
  2. 一些数据结构,比如 HashMap 和 HashSet 这种 key 需要经过 hash 计算的数据结构,在数据量大的时候使用 keyby 进行操作, 造成的性能影响是非常大的。
  3. 数据倾斜是我们的经典问题,后面再进行展开。
  4. 如果我们的下游是 MySQL,HBase 这种,我们都会进行一个批处理的操作,就是让数据存储到一个 buffer 里面,在达到某些条件的时候再进行发送,这样做的目的就是减少和外部系统的交互,降低网络开销的成本。
  5. 频繁 GC ,无论是 CMS 也好,G1 也好,在进行 GC 的时候,都会停止整个作业的运行,GC 时间较长还会导致 JobManager 和 TaskManager 没有办法准时发送心跳,此时 JobManager 就会认为此 TaskManager 失联,它就会另外开启一个新的 TaskManager
  6. 窗口是一种可以把无限数据切割为有限数据块的手段。比如我们知道,使用滑动窗口的时候数据的重叠问题,size = 5min 虽然不属于大窗口的范畴,可是 step = 1s 代表1秒就要进行一次数据的处理,这样就会造成数据的重叠很高,数据量很大的问题。

2.Flink 作业调优

27 数据去重.png
28 数据去重方案一.png

我们可以通过一些数据结构,比如 Set 或者 Map 来结合 Flink state 进行去重。但是这些去重方案会随着数据量不断增大,从而导致性能的急剧下降,比如刚刚我们分析过的 hash 冲突带来的写入性能问题,内存过大导致的 GC 问题,TaskManger 的失联问题。

29 数据去重方案二.png
30 数据去重方案二2.png
31 数据去重方案三.png
32 数据去重方案三2.png

方案二和方案三也都是通过一些数据结构的手段去进行去重,有兴趣的同学可以自行下去了解,在这里不再展开。

■ 数据倾斜

33 数据倾斜-影响.png

数据倾斜是大家都会遇到的高频问题,解决的方案也不少。

34 数据倾斜解决1.png

第一种场景是当我们的并发度设置的比分区数要低时,就会造成上面所说的消费不均匀的情况。

35 数据倾斜解决2.png

第二种提到的就是 key 分布不均匀的情况,可以通过添加随机前缀打散它们的分布,使得数据不会集中在几个 Task 中。

36 数据倾斜解决3.png

在每个节点本地对相同的 key 进行一次聚合操作,类似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的 key,因为多条相同的 key 都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。

■ 内存调优

Flink 的内存结构刚刚我们已经提及到了,所以我们清楚,调优的方面主要是针对 非堆内存 Network buffer ,manager pool 和堆内存的调优,这些基本都是通过参数来进行控制的。

37 Flink内存调优.png

这些参数我们都需要结合自身的情况去进行调整,这里只给出一些建议。而且对于 ManagerBuffer 来说,Flink 的流式作业现在并没有过多使用到这部分的内存,所以我们都会设置得比较小,不超过0.3。

38 Flink堆内存调优.png

堆内存的调优是关于 JVM 方面的,主要就是将默认使用的垃圾回收器改为 G1 ,因为默认使用的 Parallel Scavenge 对于老年代的 GC 存在一个串行化的问题,它的 Full GC 耗时较长,下面是关于 G1 的一些介绍,网上资料也非常多,这里就不展开说明了。

G1-1.png
G1-2.png
G1-3.png

总 结

本文带大家了解了 Flink 的 CheckPoint 机制,反压机制及 Flink 的内存模型和基于内存模型分析了一些调优的策略。希望能对大家有所帮助,原文分享的视频回顾可移步下方链接:

https://ververica.cn/developers/flink-training-course-operation/

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1213 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
11月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
601 9
Flink在B站的大规模云原生实践
|
12月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
1172 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
12月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
681 9
网易游戏 Flink 云原生实践
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
1391 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
916 1
Flink CDC + Hologres高性能数据同步优化实践
|
SQL 存储 调度
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
392 1
基于 Flink 进行增量批计算的探索与实践
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
709 6
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
971 2

相关产品

  • 实时计算 Flink版