Apache Flink 在小米的稳定性优化和实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 小米大数据部高级软件工程师张蛟在 FFA 2021 的演讲。

摘要:本文整理自小米大数据部高级软件工程师张蛟在 Flink Forward Asia 2021 生产实践专场的演讲。主要内容包括:

  1. 发展现状和规模
  2. 稳定性优化及实践
  3. 运维优化及实践
  4. 未来规划与展望

点击查看原文视频 & 演讲PPT

一、发展现状及规模

img

现阶段,我们的整体架构可以分成5层,数据从下往上流动,如上图。

数据采集层主要负责收集各类数据,数据的来源分为两类,一类是埋点和业务日志以及服务日志,经由 LCS Agent 进行采集,另一类是数据库数据经由 Binlog 或 Checkpoint 数据集成等方式收集到消息队列中。以 Flink、Spark 为主的计算层对其进行处理,并最终存储到各类存储和查询服务中,供业务使用。Flink 是计算层实时和准实时处理的主要框架,在其中正发挥着越来越重要的作用,尤其是 Flink+Iceberg 数据湖技术,正在让流批一体成为现实。

img

目前我们的集群上运行着 3000 多个作业,主力版本是 1.12,1.14 版本也已经合并上线,日均处理 10 万亿+ 条消息,PB 级的数据量,峰值数据 2 亿条/秒,运行在国内外 10 多个集群,使用超过 45000 个 CPU core,内存使用超过 200tb。

img

在这样规模的数据处理过程中,我们遇到了许多问题。

  • 作业内存占用不可控,on Yarn 模式非常容易出现 Yarn container OOM kill,导致 container lost,引发作业频繁重启,包括框架内重启。
  • on Yarn 模式无法支持作业自动平滑重启,在机器过保、下线、机房迁移等过程中,只能触发 failover。
  • 实时作业对负载较为敏感,启动和运行的过程中需要保证机器性能,避免因离线和在线混部造成影响。
  • Checkpoint 作为 Flink 有状态计算数据一致性的保障,存在稳定性问题。
  • historyserver 默认的清理策略不好设置,导致占用的磁盘空间比较大,访问慢。
  • 作业异常时难以确定异常原因和节点,需要查看大量的作业日志,导致故障排查困难。

二、稳定性优化及实践

img

首先是 Yarn container lost 的优化。Flink JobManager 首先会向 Yarn reCheckpointmanager 申请资源,Yarn reCheckpointmanager 为该申请分配资源后将分配信息返回给 JobManager,然后 JobManager 会根据分配信息去启动 taskmanager,并使之与 JobManager 进行心跳。

JobManager 包括 JobMaster 和 reCheckpointmanager,它会主动发送心跳请求,探测 taskmanager 是否存活。如果 taskexecutor 因为某些原因意外被 kill,JobManager 的日志中就会提示 container lost。

img

上图是 container lost 现象的提示之一,一般老版本的 Flink 中出现比较多。

img

上图是 container lost 现象的另一种提示。

img

在出现 container lost 时,如果去查看 Yarn的nodemanager 或 JobManager 中异常前后的日志,一般都可以看到类似 beyond the physical memory limit 的日志,这表明它是因为物理内存使用超限被 Yarn kill。

img

这里需要先介绍一下 Yarn 控制内存超用的方式,Yarn Nodemanager 会启动一个 containersmonitor 的线程,这个线程会定期扫描 Nodemanager 上的 container 内存占用,从而实现内存资源的隔离。

简单来说,如果某个 container 对应进程树中所有年龄大于 0 的进程,总内存使用量超过申请量的两倍,或所有年龄大于 1 的进程,总内存使用量超过上限,就表明其内存超用,需要被 kill。

但实际上这种方式存在一定的问题:

  • 一是定期扫描对于内存突增的隔离性比较差,可能还没有开始扫描就已经达到系统总内存上限,导致被系统 kill;
  • 二是 Yarn 通常会开启节点资源的超卖,此时如果所有资源都被使用,会导致节点不稳定;
  • 三是如果作业只是临时的内存需求,即使此时节点仍有富余内存,也会触发 kill。

img

针对这些问题,我们采用 Cgroup + JDK升级 + Jemalloc 的方式进行了优化。可能有人会问为什么需要进行 JDK 升级?这是因为老版本的 JDK 使用 Jemalloc 存在线程死锁的问题,另外升级最新的 JDK 也能避免其他的 JDK bug,通常这类 bug 都不容易被找到和复现。

img

Cgroup 的方式主要是开启内存软限制,它对 container 的内存限制不再是基于单个 container 的内存申请量,而是整个 Nodemanager 的内存量。这个时候如果 NodeManager 上仍有富余内存,内存超用的 container 就可以接着使用这些富余的内存。一个节点上同时存在多个 container 内存超用导致整个节点内存达到上限,才会触发 oom event。Oom listener 对该事件进行监听并判断,如果达到节点总内存就会选取内存实际占用量超过申请量且启动时间最短、优先级最低的作业触发 oom kill。

img

然而,Cgroup 只是在一定程度上解决了 container 频繁被 Yarn oom kill 导致 lost 的问题,并没有完全彻底地解决。在使用的过程中,依然存在某些 container 的内存使用持续上涨,最终被 cgroup oom kill 的情况,然后我们发现该问题可能与 glibc 的内存分配 bug 有关,长期运行的进程会存在连续多块大小为 65536 的 anon 块,所以我们最终的解决方案如下:

img

使用 Cgroup 解决内存临时超用的问题,比如 RocksDB 对内存的限制不严格、小白用户对内存的设置和使用不正确等造成的问题,然后升级 JDK 版本,解决 Jemalloc 分配时的线程死锁 bug,最后切换 Jemalloc,解决 Linux 系统下的 64M anon 分配 bug。

img

经过一系列的优化,从上图可以看出,container lost 的频率由每月的近 5000 次减少到不到 100 次,因 Yarn oom kill 造成的作业异常重启减少了 90% 以上,效果显著。

img

第二个优化实践是节点的平滑重启功能,流式作业是长时间运行的作业,由于大部分都运行在廉价的机器上,因此机器出现过保、硬件故障、维修下线、机房迁移等都比较常见。为了提前预防可能出现的隐患,避免框架重启造成的影响,提升云环境下作业的稳定性,解决 Yarn 模式下恢复时间过长带来的问题,我们开发了作业的平滑重启功能。

img

将节点加入到 exclude 后,Flink recheckpoint manager 会获取到 decommission 的信息,通过解析该信息得到对应的节点,并判断当前运行任务的 container 是否运行在被 decommission 的节点上。如果是,就通过调用任务的 JobManager 的 stop with savepoint 接口去停止。平台会自动检测任务的运行状态,如果某个作业不是通过平台停止,则平台会自动将该任务重新拉起,作业从 savepoint 恢复。这个过程会进行周期性的触发并批量合并后再处理,避免消息频繁触发造成瞬时负载压力。此外,节点和 container 都会进行去重,避免对同一任务多次触发影响稳定性。另外它的触发周期远小于 sre 在下线节点时设置的下线周期,也缓解了运维压力。

img

JobManager 会启动指标收集监控线程,并周期性地采集节点的 CPU、内存、磁盘 io 和网络 io 等指标,然后汇聚成指标集合,通过动态指标规则对指标进行判定,如果满足条件就会将其加入到节点黑名单,这样该 Application 的 container 便不会再运行在这个节点上。如果某个节点被多个 application 加入黑名单,则表明该节点可能存在问题,会自动触发作业平滑重启,并进行监控报警,以此来自动发现可能的异常节点。

img

上图是 Flink Checkpoint 的大致流程,Checkpoint coordinator 会触发 Checkpoint Operator 进行 Checkpoint,Checkpoint Operator 生成并向下游广播 Checkpoint Barrier,然后 Snapshot State。Checkpoint Operator 完成 Checkpoint 后进行 ack,下游节点收到 Checkpoint Barrier 后,根据是否要进行对齐做对应的处理,然后进入 Checkpoint 逻辑。所有的节点都向 Checkpoint Coordinateor ack 之后,表示该次 Checkpoint 已经完成,接着向所有参与 Checkpoint 的 Operator 发送完成通知,最后 Operator 做最后的提交操作等。

img

Checkpoint 过程中遇到的问题包括以下这些:

  • 磁盘满或其他 io 异常,会导致 Checkpoint 长期无法触发,但异常信息只存在于 JobManager 的日志中,并不影响作业的正常执行,导致潜在的隐患不容易被感知。
  • 作业因逻辑变更、调整并发、重新调度等原因,重启时默认不会从 Checkpoint 恢复,导致状态丢失或者消息积压。
  • 大并发度时 Checkpoint 小文件过多,引发大量的 HDFS RPC 负载压力。
  • 用户错误配置 Checkpoint 目录引发的恢复冲突非常不容易控制,也不易排查。

针对以上问题,我们也进行了一些优化。

img

针对磁盘满、io 异常、Kerberos 文件损坏的问题,我们会捕获异常栈,根据异常栈进行判断和重试,并在失败时增加 Checkpoint 的失败计数,超过一定次数则进行框架内的重启,或向用户发送告警,保证作业不会出现长时间的 Checkpoint 失败而从一个非常老的 Checkpoint 恢复。

img

针对作业重启时无法从 Checkpoint 恢复的问题,优化方式是对每个作业设置默认的保留数量,并在进行 Checkpoint 时先生成一个临时的 Checkpoint Metadata 文件,只有在 Finalize 时才会被 rename 成正式的文件。接着将所有 Checkpoint 文件按最后修改时间降序排序,在其中寻找正式的 Checkpoint Metadata 文件。如果成功则表明其是一个完备的、可用于恢复的 Checkpoint 文件。

在这样的设定下,必须确保文件最后修改时间的正确性。为此我们设置了任务 finish 默认不删除 Checkpoint 文件,任务在做 Savepoint 时默认不 discard 最新的 Checkpoint 文件,以确保这两类文件最后修改时间的正确性。通过以上方式保证了任务能自动从最新的、完备的状态进行恢复,需要重新处理的数据和状态尽量少。另外,如果任务已经找到最新的、完备的 Checkpoint 并可以用来恢复,这表明前面的 Savepoint 和 Checkpoint 已经可以清理,由此减少空间的占用。

于是我们通过为 Savepoint 设置生命周期来清理全量 Savepoint;对于增量的 Checkpoint,为了避免清除掉正在使用的状态,会先去读取其 Metadata 文件的内容,将其中用到的状态文件对应的父文件夹保留,其余的进行清理,从而确保在不影响状态恢复的前提下,尽量减少文件数和空间占用。

img

针对用户随意配置 Checkpoint 目录导致状态恢复冲突和引发负载压力的问题,通过在 Metadata 文件中增加作业名和时间戳,当前的作业名与存储的作业名不同则会提示告警信息,恢复的 Checkpoint 的时间戳与当前时间存在较大的差异,也会有告警信息。

小文件是使用 HDFS 经常遇到的问题,由于 HDFS 适合于存储大块文件,所以必须对小文件进行优化来提升性能和稳定性。方法是在进行 Checkpoint 时对小文件写入进行合并,比如将多个小文件写入到 sequence file 中,形成一个大的文件,这可能会造成空间浪费,但是对降低 HDFS Namenode 负载压力效果比较明显。

此外通过联邦集群的方式,使用多个 Namenode 均衡 RPC 请求负载,每一个 Namenode 都是一个相对独立的服务,然后对用户作业规范其 Checkpoint 目录,使其访问能够被均衡到多个 Namenode 上,再对旧的 HDFS 文件通过挂载表的形式读旧写新,逐步实现自动迁移到新的统一的规范目录下。

img

接下来介绍一个案例,该案例来自小米数据采集服务,图示是他们非常简单的架构图,主要是将多个源端 SDK 的埋点和数据收集到消息队列中,然后使用 Flink 进行 ETL,最终存储到 Doris 中并在看板上进行展示。

img

目前该业务已经接入 750+ 国内外业务,日均处理 1600亿+ 条消息。通过采用 Checkpoint 相关的优化手段,将 RPC 延迟降低了约 40%,减少了小文件。同时在作业通过 stop with savepoint 启停时,保证了恢复的正确性,确保了 exactly once 的语义。

三、运维优化实践

img

Flink Historyserver 对作业运维非常有效,尤其是它能在作业停止后,查看作业的统计信息,如果作业异常退出或处理结果有问题,我们又因为一些原因无法及时查看相关日志,就可以在将来通过 Historyserver 查看。

Flink Historyserver 会在每一次定时清理时获取上一次清理已经被缓存的作业 ID,再获取本次已经打包的历史日志信息,然后判断历史日志是否已经超过了配置的最大值,若是,就会将后面的历史日志直接执行清理,否则就会判断上一次缓存的作业在当次历史日志中是否存在,如果不存在也会执行清理。

但上述流程存在一系列的问题,一个是服务重启会造成当前缓存的已下载的作业信息丢失,如果在重启之间该作业的历史日志也丢失,就会形成悬浮的缓存作业,本地缓存的作业将会长期存在,无法清理。当前已打包的历史日志信息不支持过期,导致大量的日志存留于 HDFS 和本地磁盘,且会长期存在,不仅影响访问的速度,也会造成磁盘空间的较大浪费。缓存下来的作业历史日志最大值难以确定,基础服务如 HDFS 等如果出现异常,会导致同时出现大量失败,冲走有效日志。另外当前默认并没有记录 Taskmanager 上的日志,非常不利于异常排查,

针对上述问题我们也做了相应优化。

img

一个是读取当前已经缓存到本地磁盘的作业历史日志信息,并将其与历史日志记录进行对比,从而避免出现悬浮的缓存作业;支持历史日志的最长保留时间,超过其生命周期就会进行清理,相比于当前支持的历史日志最大保留数量,更加科学合理;另外我们也支持了 Taskmanager 和 Container 历史数据的打包和清理,更全面地记录作业在异常退出时的各项信息,方便排查问题。

img

作业的全链路心跳监控功能主要是对作业的链路延时进行监控,实现方式是通过在 Stream Checkpoint 中插入特殊标记,标记信息包括作业的名称、当前的时间,名称的生成方式是 op+operator 在整个链路的 index 以及 subtask 在 operator 的 index,非 Checkpoint 节点会在收到标记后更新名称,并用当前的时间减去 Checkpoint 插入的时间生成从 Checkpoint 到该 subtask 的耗时,并上报到 Metrics Reporter 中,最终对这些 metrics 进行计算,通过这种方式可以发现链路中的异常节点,监测作业的数据异常丢失,还能够通过心跳信息的插入频率预估其影响。

心跳标记在遇到多个下游链路时并不是随机选择链路,而是同时广播到多条链路中,因此可能会出现心跳监控标记信息过多的情况,影响正常作业的处理性能。

img

这时就出现了一个矛盾点,全链路心跳监控采样越频繁,对各节点处理性能的监控就越及时准确,但同时也会造成信息过多、影响正常数据的处理。针对这个问题,我们进行了以下三个方面的改进和处理:

  • 一是将 chain operator metrics 信息进行合并上报,因为它的监控信息基本相同,这样可以减少上报的数据量。
  • 二是通过 restful 接口动态启停监控,这样只有在有异常时才会进行采样和监控,正常情况下不影响作业的运行。
  • 三是通过对采样进行周期性的合并和处理,实现了对任务 pipeline 数据量和延迟的预估以及监控功能。

img

restful 接口动态启停监控功能不仅能动态启停心跳监控,我们发现还有其他场景也能从这个功能中受益,因此我们对其进行了扩展。简单的代码修改就能让它支持其他配置的动态调整,包括 Checkpoint 配置,如 Checkpoint 周期和超时时间,动态日志的级别等。

img

当作业出现性能或 Checkpoint 问题时,可以通过 restful 接口动态开启、问题确定后动态停止,这样就能解决心跳信息过多的问题。在负载突增、短时数据倾斜导致 Checkpoint 超时,动态调整 Checkpoint 超时时间能避免作业因 Checkpoint 超时而失败,它也能避免由于 Checkpoint 长时间不成功导致数据积压更多、数据倾斜问题更严重而陷入的死循环。同时它还能用于确定超时时间,用户可以通过动态调整的方式,不断测试最适合作业的超时时间,减少了压测过程中的作业启停次数。它也支持其他配置的调整,比如动态调整日志级别,但是需要注意的是调整后的配置并没有持久化,会因为框架重启或作业的重启而失效。

四、未来规划

img

未来,我们将在以下方面继续探索:

  • 持续开发并优化自动弹性伸缩容的功能。Flink1.13 开始提供了自动弹性伸缩容的功能,但是目前并不完善,要在生产环境上用起来还需要做不少的工作。
  • 版本收敛是很多Flink开发人员都会遇到的一个问题。Flink 社区的发展比较快,版本的发布和迭代也是非常快。为了降低运维压力,紧跟社区,这也是势在必行的。
  • 对 state 读写性能进行优化,提升大状态作业的性能。
  • Heartbeat timeout 也是目前线上对稳定性影响比较大的问题,我们也会进行跟进和优化。
  • 对作业启动和恢复性能进行优化,减少作业因各种原因造成的断流是 Flink 社区和许多业务非常关注的问题,我们同样也有面临着这样的压力。
  • 继续打磨批流融合能力,完善对 batch 模式和数据湖等的支持,也是现在的热点,我们希望能在这上面进行更多探索,从而更好地支撑业务,也让 Flink 的应用更加广泛。

点击查看原文视频 & 演讲PPT


近期热点

活动报名|9月23日 实时数仓Workshop · 北京站

Beyond Stream Processing !第四届实时计算 Flink 挑战赛启动,49 万奖金等你来拿!

5 大类应用场景,26 个大厂真实生产案例分享,2022 年度 Apache Flink 案例集发布

img

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
36028
分享
相关文章
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
360 33
The Past, Present and Future of Apache Flink
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
117 4
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
445 6
基于 Flink 进行增量批计算的探索与实践
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
440 2
探索Flink动态CEP:杭州银行的实战案例
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
151 0
Flink CDC 在阿里云实时计算Flink版的云上实践
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
105 1
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
140 9
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
84 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等