Apache Flink 在小米的稳定性优化和实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 小米大数据部高级软件工程师张蛟在 FFA 2021 的演讲。

摘要:本文整理自小米大数据部高级软件工程师张蛟在 Flink Forward Asia 2021 生产实践专场的演讲。主要内容包括:

  1. 发展现状和规模
  2. 稳定性优化及实践
  3. 运维优化及实践
  4. 未来规划与展望

点击查看原文视频 & 演讲PPT

一、发展现状及规模

img

现阶段,我们的整体架构可以分成5层,数据从下往上流动,如上图。

数据采集层主要负责收集各类数据,数据的来源分为两类,一类是埋点和业务日志以及服务日志,经由 LCS Agent 进行采集,另一类是数据库数据经由 Binlog 或 Checkpoint 数据集成等方式收集到消息队列中。以 Flink、Spark 为主的计算层对其进行处理,并最终存储到各类存储和查询服务中,供业务使用。Flink 是计算层实时和准实时处理的主要框架,在其中正发挥着越来越重要的作用,尤其是 Flink+Iceberg 数据湖技术,正在让流批一体成为现实。

img

目前我们的集群上运行着 3000 多个作业,主力版本是 1.12,1.14 版本也已经合并上线,日均处理 10 万亿+ 条消息,PB 级的数据量,峰值数据 2 亿条/秒,运行在国内外 10 多个集群,使用超过 45000 个 CPU core,内存使用超过 200tb。

img

在这样规模的数据处理过程中,我们遇到了许多问题。

  • 作业内存占用不可控,on Yarn 模式非常容易出现 Yarn container OOM kill,导致 container lost,引发作业频繁重启,包括框架内重启。
  • on Yarn 模式无法支持作业自动平滑重启,在机器过保、下线、机房迁移等过程中,只能触发 failover。
  • 实时作业对负载较为敏感,启动和运行的过程中需要保证机器性能,避免因离线和在线混部造成影响。
  • Checkpoint 作为 Flink 有状态计算数据一致性的保障,存在稳定性问题。
  • historyserver 默认的清理策略不好设置,导致占用的磁盘空间比较大,访问慢。
  • 作业异常时难以确定异常原因和节点,需要查看大量的作业日志,导致故障排查困难。

二、稳定性优化及实践

img

首先是 Yarn container lost 的优化。Flink JobManager 首先会向 Yarn reCheckpointmanager 申请资源,Yarn reCheckpointmanager 为该申请分配资源后将分配信息返回给 JobManager,然后 JobManager 会根据分配信息去启动 taskmanager,并使之与 JobManager 进行心跳。

JobManager 包括 JobMaster 和 reCheckpointmanager,它会主动发送心跳请求,探测 taskmanager 是否存活。如果 taskexecutor 因为某些原因意外被 kill,JobManager 的日志中就会提示 container lost。

img

上图是 container lost 现象的提示之一,一般老版本的 Flink 中出现比较多。

img

上图是 container lost 现象的另一种提示。

img

在出现 container lost 时,如果去查看 Yarn的nodemanager 或 JobManager 中异常前后的日志,一般都可以看到类似 beyond the physical memory limit 的日志,这表明它是因为物理内存使用超限被 Yarn kill。

img

这里需要先介绍一下 Yarn 控制内存超用的方式,Yarn Nodemanager 会启动一个 containersmonitor 的线程,这个线程会定期扫描 Nodemanager 上的 container 内存占用,从而实现内存资源的隔离。

简单来说,如果某个 container 对应进程树中所有年龄大于 0 的进程,总内存使用量超过申请量的两倍,或所有年龄大于 1 的进程,总内存使用量超过上限,就表明其内存超用,需要被 kill。

但实际上这种方式存在一定的问题:

  • 一是定期扫描对于内存突增的隔离性比较差,可能还没有开始扫描就已经达到系统总内存上限,导致被系统 kill;
  • 二是 Yarn 通常会开启节点资源的超卖,此时如果所有资源都被使用,会导致节点不稳定;
  • 三是如果作业只是临时的内存需求,即使此时节点仍有富余内存,也会触发 kill。

img

针对这些问题,我们采用 Cgroup + JDK升级 + Jemalloc 的方式进行了优化。可能有人会问为什么需要进行 JDK 升级?这是因为老版本的 JDK 使用 Jemalloc 存在线程死锁的问题,另外升级最新的 JDK 也能避免其他的 JDK bug,通常这类 bug 都不容易被找到和复现。

img

Cgroup 的方式主要是开启内存软限制,它对 container 的内存限制不再是基于单个 container 的内存申请量,而是整个 Nodemanager 的内存量。这个时候如果 NodeManager 上仍有富余内存,内存超用的 container 就可以接着使用这些富余的内存。一个节点上同时存在多个 container 内存超用导致整个节点内存达到上限,才会触发 oom event。Oom listener 对该事件进行监听并判断,如果达到节点总内存就会选取内存实际占用量超过申请量且启动时间最短、优先级最低的作业触发 oom kill。

img

然而,Cgroup 只是在一定程度上解决了 container 频繁被 Yarn oom kill 导致 lost 的问题,并没有完全彻底地解决。在使用的过程中,依然存在某些 container 的内存使用持续上涨,最终被 cgroup oom kill 的情况,然后我们发现该问题可能与 glibc 的内存分配 bug 有关,长期运行的进程会存在连续多块大小为 65536 的 anon 块,所以我们最终的解决方案如下:

img

使用 Cgroup 解决内存临时超用的问题,比如 RocksDB 对内存的限制不严格、小白用户对内存的设置和使用不正确等造成的问题,然后升级 JDK 版本,解决 Jemalloc 分配时的线程死锁 bug,最后切换 Jemalloc,解决 Linux 系统下的 64M anon 分配 bug。

img

经过一系列的优化,从上图可以看出,container lost 的频率由每月的近 5000 次减少到不到 100 次,因 Yarn oom kill 造成的作业异常重启减少了 90% 以上,效果显著。

img

第二个优化实践是节点的平滑重启功能,流式作业是长时间运行的作业,由于大部分都运行在廉价的机器上,因此机器出现过保、硬件故障、维修下线、机房迁移等都比较常见。为了提前预防可能出现的隐患,避免框架重启造成的影响,提升云环境下作业的稳定性,解决 Yarn 模式下恢复时间过长带来的问题,我们开发了作业的平滑重启功能。

img

将节点加入到 exclude 后,Flink recheckpoint manager 会获取到 decommission 的信息,通过解析该信息得到对应的节点,并判断当前运行任务的 container 是否运行在被 decommission 的节点上。如果是,就通过调用任务的 JobManager 的 stop with savepoint 接口去停止。平台会自动检测任务的运行状态,如果某个作业不是通过平台停止,则平台会自动将该任务重新拉起,作业从 savepoint 恢复。这个过程会进行周期性的触发并批量合并后再处理,避免消息频繁触发造成瞬时负载压力。此外,节点和 container 都会进行去重,避免对同一任务多次触发影响稳定性。另外它的触发周期远小于 sre 在下线节点时设置的下线周期,也缓解了运维压力。

img

JobManager 会启动指标收集监控线程,并周期性地采集节点的 CPU、内存、磁盘 io 和网络 io 等指标,然后汇聚成指标集合,通过动态指标规则对指标进行判定,如果满足条件就会将其加入到节点黑名单,这样该 Application 的 container 便不会再运行在这个节点上。如果某个节点被多个 application 加入黑名单,则表明该节点可能存在问题,会自动触发作业平滑重启,并进行监控报警,以此来自动发现可能的异常节点。

img

上图是 Flink Checkpoint 的大致流程,Checkpoint coordinator 会触发 Checkpoint Operator 进行 Checkpoint,Checkpoint Operator 生成并向下游广播 Checkpoint Barrier,然后 Snapshot State。Checkpoint Operator 完成 Checkpoint 后进行 ack,下游节点收到 Checkpoint Barrier 后,根据是否要进行对齐做对应的处理,然后进入 Checkpoint 逻辑。所有的节点都向 Checkpoint Coordinateor ack 之后,表示该次 Checkpoint 已经完成,接着向所有参与 Checkpoint 的 Operator 发送完成通知,最后 Operator 做最后的提交操作等。

img

Checkpoint 过程中遇到的问题包括以下这些:

  • 磁盘满或其他 io 异常,会导致 Checkpoint 长期无法触发,但异常信息只存在于 JobManager 的日志中,并不影响作业的正常执行,导致潜在的隐患不容易被感知。
  • 作业因逻辑变更、调整并发、重新调度等原因,重启时默认不会从 Checkpoint 恢复,导致状态丢失或者消息积压。
  • 大并发度时 Checkpoint 小文件过多,引发大量的 HDFS RPC 负载压力。
  • 用户错误配置 Checkpoint 目录引发的恢复冲突非常不容易控制,也不易排查。

针对以上问题,我们也进行了一些优化。

img

针对磁盘满、io 异常、Kerberos 文件损坏的问题,我们会捕获异常栈,根据异常栈进行判断和重试,并在失败时增加 Checkpoint 的失败计数,超过一定次数则进行框架内的重启,或向用户发送告警,保证作业不会出现长时间的 Checkpoint 失败而从一个非常老的 Checkpoint 恢复。

img

针对作业重启时无法从 Checkpoint 恢复的问题,优化方式是对每个作业设置默认的保留数量,并在进行 Checkpoint 时先生成一个临时的 Checkpoint Metadata 文件,只有在 Finalize 时才会被 rename 成正式的文件。接着将所有 Checkpoint 文件按最后修改时间降序排序,在其中寻找正式的 Checkpoint Metadata 文件。如果成功则表明其是一个完备的、可用于恢复的 Checkpoint 文件。

在这样的设定下,必须确保文件最后修改时间的正确性。为此我们设置了任务 finish 默认不删除 Checkpoint 文件,任务在做 Savepoint 时默认不 discard 最新的 Checkpoint 文件,以确保这两类文件最后修改时间的正确性。通过以上方式保证了任务能自动从最新的、完备的状态进行恢复,需要重新处理的数据和状态尽量少。另外,如果任务已经找到最新的、完备的 Checkpoint 并可以用来恢复,这表明前面的 Savepoint 和 Checkpoint 已经可以清理,由此减少空间的占用。

于是我们通过为 Savepoint 设置生命周期来清理全量 Savepoint;对于增量的 Checkpoint,为了避免清除掉正在使用的状态,会先去读取其 Metadata 文件的内容,将其中用到的状态文件对应的父文件夹保留,其余的进行清理,从而确保在不影响状态恢复的前提下,尽量减少文件数和空间占用。

img

针对用户随意配置 Checkpoint 目录导致状态恢复冲突和引发负载压力的问题,通过在 Metadata 文件中增加作业名和时间戳,当前的作业名与存储的作业名不同则会提示告警信息,恢复的 Checkpoint 的时间戳与当前时间存在较大的差异,也会有告警信息。

小文件是使用 HDFS 经常遇到的问题,由于 HDFS 适合于存储大块文件,所以必须对小文件进行优化来提升性能和稳定性。方法是在进行 Checkpoint 时对小文件写入进行合并,比如将多个小文件写入到 sequence file 中,形成一个大的文件,这可能会造成空间浪费,但是对降低 HDFS Namenode 负载压力效果比较明显。

此外通过联邦集群的方式,使用多个 Namenode 均衡 RPC 请求负载,每一个 Namenode 都是一个相对独立的服务,然后对用户作业规范其 Checkpoint 目录,使其访问能够被均衡到多个 Namenode 上,再对旧的 HDFS 文件通过挂载表的形式读旧写新,逐步实现自动迁移到新的统一的规范目录下。

img

接下来介绍一个案例,该案例来自小米数据采集服务,图示是他们非常简单的架构图,主要是将多个源端 SDK 的埋点和数据收集到消息队列中,然后使用 Flink 进行 ETL,最终存储到 Doris 中并在看板上进行展示。

img

目前该业务已经接入 750+ 国内外业务,日均处理 1600亿+ 条消息。通过采用 Checkpoint 相关的优化手段,将 RPC 延迟降低了约 40%,减少了小文件。同时在作业通过 stop with savepoint 启停时,保证了恢复的正确性,确保了 exactly once 的语义。

三、运维优化实践

img

Flink Historyserver 对作业运维非常有效,尤其是它能在作业停止后,查看作业的统计信息,如果作业异常退出或处理结果有问题,我们又因为一些原因无法及时查看相关日志,就可以在将来通过 Historyserver 查看。

Flink Historyserver 会在每一次定时清理时获取上一次清理已经被缓存的作业 ID,再获取本次已经打包的历史日志信息,然后判断历史日志是否已经超过了配置的最大值,若是,就会将后面的历史日志直接执行清理,否则就会判断上一次缓存的作业在当次历史日志中是否存在,如果不存在也会执行清理。

但上述流程存在一系列的问题,一个是服务重启会造成当前缓存的已下载的作业信息丢失,如果在重启之间该作业的历史日志也丢失,就会形成悬浮的缓存作业,本地缓存的作业将会长期存在,无法清理。当前已打包的历史日志信息不支持过期,导致大量的日志存留于 HDFS 和本地磁盘,且会长期存在,不仅影响访问的速度,也会造成磁盘空间的较大浪费。缓存下来的作业历史日志最大值难以确定,基础服务如 HDFS 等如果出现异常,会导致同时出现大量失败,冲走有效日志。另外当前默认并没有记录 Taskmanager 上的日志,非常不利于异常排查,

针对上述问题我们也做了相应优化。

img

一个是读取当前已经缓存到本地磁盘的作业历史日志信息,并将其与历史日志记录进行对比,从而避免出现悬浮的缓存作业;支持历史日志的最长保留时间,超过其生命周期就会进行清理,相比于当前支持的历史日志最大保留数量,更加科学合理;另外我们也支持了 Taskmanager 和 Container 历史数据的打包和清理,更全面地记录作业在异常退出时的各项信息,方便排查问题。

img

作业的全链路心跳监控功能主要是对作业的链路延时进行监控,实现方式是通过在 Stream Checkpoint 中插入特殊标记,标记信息包括作业的名称、当前的时间,名称的生成方式是 op+operator 在整个链路的 index 以及 subtask 在 operator 的 index,非 Checkpoint 节点会在收到标记后更新名称,并用当前的时间减去 Checkpoint 插入的时间生成从 Checkpoint 到该 subtask 的耗时,并上报到 Metrics Reporter 中,最终对这些 metrics 进行计算,通过这种方式可以发现链路中的异常节点,监测作业的数据异常丢失,还能够通过心跳信息的插入频率预估其影响。

心跳标记在遇到多个下游链路时并不是随机选择链路,而是同时广播到多条链路中,因此可能会出现心跳监控标记信息过多的情况,影响正常作业的处理性能。

img

这时就出现了一个矛盾点,全链路心跳监控采样越频繁,对各节点处理性能的监控就越及时准确,但同时也会造成信息过多、影响正常数据的处理。针对这个问题,我们进行了以下三个方面的改进和处理:

  • 一是将 chain operator metrics 信息进行合并上报,因为它的监控信息基本相同,这样可以减少上报的数据量。
  • 二是通过 restful 接口动态启停监控,这样只有在有异常时才会进行采样和监控,正常情况下不影响作业的运行。
  • 三是通过对采样进行周期性的合并和处理,实现了对任务 pipeline 数据量和延迟的预估以及监控功能。

img

restful 接口动态启停监控功能不仅能动态启停心跳监控,我们发现还有其他场景也能从这个功能中受益,因此我们对其进行了扩展。简单的代码修改就能让它支持其他配置的动态调整,包括 Checkpoint 配置,如 Checkpoint 周期和超时时间,动态日志的级别等。

img

当作业出现性能或 Checkpoint 问题时,可以通过 restful 接口动态开启、问题确定后动态停止,这样就能解决心跳信息过多的问题。在负载突增、短时数据倾斜导致 Checkpoint 超时,动态调整 Checkpoint 超时时间能避免作业因 Checkpoint 超时而失败,它也能避免由于 Checkpoint 长时间不成功导致数据积压更多、数据倾斜问题更严重而陷入的死循环。同时它还能用于确定超时时间,用户可以通过动态调整的方式,不断测试最适合作业的超时时间,减少了压测过程中的作业启停次数。它也支持其他配置的调整,比如动态调整日志级别,但是需要注意的是调整后的配置并没有持久化,会因为框架重启或作业的重启而失效。

四、未来规划

img

未来,我们将在以下方面继续探索:

  • 持续开发并优化自动弹性伸缩容的功能。Flink1.13 开始提供了自动弹性伸缩容的功能,但是目前并不完善,要在生产环境上用起来还需要做不少的工作。
  • 版本收敛是很多Flink开发人员都会遇到的一个问题。Flink 社区的发展比较快,版本的发布和迭代也是非常快。为了降低运维压力,紧跟社区,这也是势在必行的。
  • 对 state 读写性能进行优化,提升大状态作业的性能。
  • Heartbeat timeout 也是目前线上对稳定性影响比较大的问题,我们也会进行跟进和优化。
  • 对作业启动和恢复性能进行优化,减少作业因各种原因造成的断流是 Flink 社区和许多业务非常关注的问题,我们同样也有面临着这样的压力。
  • 继续打磨批流融合能力,完善对 batch 模式和数据湖等的支持,也是现在的热点,我们希望能在这上面进行更多探索,从而更好地支撑业务,也让 Flink 的应用更加广泛。

点击查看原文视频 & 演讲PPT


近期热点

活动报名|9月23日 实时数仓Workshop · 北京站

Beyond Stream Processing !第四届实时计算 Flink 挑战赛启动,49 万奖金等你来拿!

5 大类应用场景,26 个大厂真实生产案例分享,2022 年度 Apache Flink 案例集发布

img

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
30天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
588 13
Apache Flink 2.0-preview released
|
20天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
53 4
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
68 3
|
19天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
99 61
|
1月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
38 0
|
1月前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
54 0
|
1月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
16天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
694 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多