Flink在B站的大规模云原生实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文整理自哗哩哗哩资深开发工程师丁国涛老师,在FlinkForwardAsia2024云原生专场的分享。内容分为以下五个部分:1.背景介绍2.功能及稳定性优化3.性能优化4.运维优化5.未来展望

01背景介绍

1.1 云原生化的优势

在Flink建设初期,由于Flink与Hadoop结合比较好,所以采用Flink On YARN架构。但是Flink On YARN遇到一些问题,主要有三个方面:

  1. 原先YARN集群是独占集群,资源使用率不高。由于Flink作业本身流量上有波峰波谷,用户为了避免Flink作业产生的Lag堆积,所以会按照波峰资源去提交Flink作业,这样会导致在非波峰时间里整个Flink作业的资源使用率不高。
  2. 运行时环境的一致性。YARN上使用的是物理机, Flink运行依赖于物理机环境。如果产生环境问题,本地较难复现线上的环境,导致这类问题难排查。另外一些高级用户对基础环境是有要求的,这部分就会难以满足。
  3. YARN上资源隔离性差,作业之间容易产生影响。

随着K8S容器化技术的发展,上面的问题得到了解决。

  1. 资源池统一。目前B站等很多业务都在K8S上运行,可以实现资源池的统一,特别是一些Spark作业也运行在整个K8S上,未来可以实时和离线混部,提升资源利用率。
  2. Docker容器化的技术隔离性好,实现本地环境和线上环境的一致性,也可以按用户需要定制自己的环境。
  3. 和公司进行统一基础底座、运维管理和繁荣的生态。


1.2 系统设计

下面介绍目前对整个K8S的系统设计。

为了对用户提供一致性体验,按照队列形式来区分不同集群,用户只需要提交到K8S对应的队列就可以迁移到K8S上。整个实时平台会监控作业的状态来保证作业的稳定性。


1.3 当前规模

  • B站使用Flink比较早,随着业务的发展,Streaming作业超过6500个,Batch作业超过3000个。60%的Streaming作业中已经迁移到了K8S上。
  • 单作业状态最大超过30TB,作业并行度最大超过6000。
  • 部署模式上Streaming和Batch统一采用Native Flink On K8S。Streaming和Batch混部, Batch作业也和Spark作业混部。


02功能及稳定性优化

下面介绍第二部分功能及稳定性优化。

介绍B站现在迁移K8S容器化过程中做的一些改造和遇到的问题与优化方案。

2.1 Flink镜像

首先相对社区的Flink镜像做了一个定制。

  1. 镜像分层。最初Flink镜像大小超过3GB,拉取时间比较长。为了增加镜像复用,做了镜像分层:Base镜像和发版镜像,Base镜像一般不变。发版镜像如右图,只包含一些Flink版本,拉取一个增量镜像的耗时就会变小。
  2. Base镜像中添加一些arthas等一些线上出问题时的调试组件。
  3. 相比社区镜像删除一些冗余模块,例如example、opt,进一步减少整个镜像的体积。
  4. 增加了usrlib目录,这个目录在Flink中比较特殊。Flink在task运行前会加载usrlib目录来生成用户的userClasspath。我们利用该特性将一些SQL依赖放入usrlib中,这样SQL作业提交时就不需要再重复下载相关的依赖,只需要下载UDF资源,就可以运行作业。usrlib目录中包含flink-bilibili-starter(所有SQL作业的入口)和table api(整个Flink版本编译出的依赖包)。如果默认加载该目录,对DataStream作业不太友好。DataStream作业本身不需要这些依赖,加载这些依赖可能会产生类冲突。所以针对DataStream作业运行时不加载usrlib目录,只需要加载appalication jar即可。


2.2 支持本地路径挂载

下面介绍下云原生的本地磁盘选择。

目前的背景是K8S集群的机器是从YARN机器迁移而来,这些机器具有很多磁盘。为了充分利用这些磁盘,采用本地磁盘挂载的方式挂载物理机上的磁盘,通过hostpath方式为Pod挂载磁盘。

集群中机器间的磁盘是异构的,也可能会遇到一些磁盘故障或磁盘权限的问题等,就会导致启动失败。目前的方案统一所有磁盘挂载的根目录,例如该机器上有十几个盘,都统一挂载到/mnt子目录下。这样Flink启动Pod时只需要挂载/mnt目录即可。Flink进程启动时会去历史/mnt子目录通过创建临时目录的方式检测磁盘是否可用,如果可用就会将所有可用磁盘更新在配置文件中,后续其它组件就可以方便使用。

针对一些大状态的作业,配置的是Rocksdb state backend。在默认模式下RocksDB会随机选择可用的磁盘,可能会造成磁盘热点。做的一个优化是在RocksDB进行磁盘选择时会根据当前的磁盘IO性能和磁盘容量去选择最优磁盘。

使用hostpath挂载的磁盘时在退出后不会主动删除数据。这方面做的一个改进是在Pod中增加lifecycle,在Pod退出时通过preStop方式清理磁盘上的数据。


2.3 JobManager Failover

下面介绍遇到的一些问题。JobManager Pod是由Deployment生成的,在JM出现问题时,Deployment为了维持整个Deployment的副本数,会重新拉取JM的Pod,会导致两个问题:

  • 关闭HA时,拉起的新的JM的Pod会被当成作业的首次启动,从作业首次启动的Checkpoint/Savepoint恢复,会导致大量的数据重复。
  • 开启HA时, Checkpoint/Savepoint的信息会存放在HA数据中。当JM恢复时,会去解析HA数据,这样可以正确加载之前的Checkpoint路径,但是如果遇到JobManager启动过程中的报错时,JM会陷入到无限重试。

这两个问题在YARN上容易解决。YARN上有两个参数yarn.application-attempts,yarn.application-attempt-failures-validity-interval,可以允许YARN上的AppMaster在一定时间内的失败次数。如果关闭HA,默认为1,可以较好兼顾JM的重试、无限失败和恢复。

我们参考Flink On YARN的模式,在K8S上做了一些优化。在JM启动时判断Pod中的restart次数,如果超出阈值就认为该JM是一个错误恢复,会停止该作业。如果没有开启HA默认为1。这种能涵盖线上大多数的问题。

但是目前该方案还存在问题:如果遇到一些需要重新调度Pod的场景时,Pod的restart次数重置导致这个判断失效。


2.4 异常Pod

下面介绍另一个问题异常Pod的处理。

在整个K8S上如果出现机器故障或者kubelet进程异常等情况会引起Pod异常。

当Pod出现异常时有两个状态:Pod还在运行或者已经失败。

运行状态下,整个Flink作业处于非健康的状态。应对的方案是

  • 如果该Pod是一个JM Pod,就会将Flink进程结束掉,通过实时平台统一拉起作业,重起作业。
  • 如果是一个TM的Pod,就会热迁移该Pod,作业不会失败。

当Pod处于失败状态下:

  • 通常作业已经失败。Flink对于这种作业失败比较敏感,网络栈上的感知,或者JM与TM的心跳超时都会触发作业的失败。
  • 如果机器异常,当前的状态无法汇报到API Server中,所以K8S会将该Pod标记为Unknown状态。在内部实时平台中也有一些作业双跑检测的兜底措施,会误认为作业在双胞,而将作业杀死。另外太多Unknown Pod也会对K8S有性能方面的影响。


2.5 异常TM Pod处理

当异常Pod是TM 时的解决方案。

在TM Pod Watcher中监听TM Pod的状态变化,关注出现的NodeLost事件,当收到到NodeLost时:

  1. 先强制删除Pod(一般删除无法删除Pod的记录,APIServer中标记Pod为Unknown状态),
  2. 如果Pod是Running状态会进行该Pod热迁移,尽量减少作业重启对用户的影响。热迁移是先将该Pod添加到JobManager黑名单中,然后申请资源。由于之后要迁移走该Pod,这里提前申请对应资源可以续降低后续重启耗时。申请成功后触发一个Checkpoint,这样重启时可以减少数据重复,如果Checkpoint成功就会触发作业重启。最后释放掉之前有问题的Pod。


2.6 异常JM Pod处理

异常Pod是JM 的解决方案。

  • 如果JM Pod还在运行会将Flink作业取消再重新拉起;
  • 如果整个JM Pod已经通过K8S恢复,在K8S恢复时,根据cluster-id去获取JM Pod的状态,如果异常的Pod会强制删除来规避异常状态。


2.7 资源不足

下面是遇到的Service资源不足。

Flink On K8S中有两个Service,一个是internal service用来JM Pod与TM Pod的通信,如果开启HA,默认不生成internal service。另一个是Rest Service用来访问Flink 的外部UI。

K8S提供了四种service。

  1. NodePort:在所有K8S节点中开放一个端口,可以用ip和端口访问服务。
  2. ClusterIP,通过集群内部的IP+Port访问,但该IP一般是内部IP,需要与Ingress结合使用。
  3. Headless ClusterIP,与ClusterIP类似,但没有内部IP。
  4. LoadBalancer,需要与云厂商的负载均衡器结合使用。

最初我们使用的是NodePort方案,存在问题是:它在所有节点上占用一个端口,默认在K8S配置的范围是30000-32767,随着作业数量的增加,发现端口资源不足。

目前的改进是

  1. 使用Host网络,由于Flink对于网络的要求较高,目前直接采用Host网络。Host网络模式下,作业启动后会生成随机端口,然后将端口更新到Rest Service中,最终生成的端口在Rest Service中可以查到。
  2. 由于没有使用内部网络而是Host网络,所以不需要ClusterIP,使用的Headless ClusterIP。
  3. 实时平台通过解析Rest Endpoint去获取IP和point,获取真实JM真实生成的WebUI地址。


2.8 Pod线程数隔离

下面的问题是Pod线程数隔离。

我们在生产环境遇到了另一个问题,Flink作业报错无法创建线程了,原因是Linux中对Pid有限制,Linux进程或者Java线程都会占用一个Pid,当Flink的出现线程泄露时,会大量消耗Pid,影响到同服务器的其它的作业无法创建线程。

我们的改进是增加了Pod Pid的数量限制。在Flink启动时设置Pid数量的阈值,如果超出阈值就会kill Pod。

另外一个是Flink的资源规格主要有两种:2C8G和5C20G。不同资源规格的TM所需要的Pid的上限不同。通常Flink需要的线程数量,和他task数量有紧密关系。我们就在Flink启动时根据numberOfTaskSlots来调整阈值大小,以满足不同规格的限制要求。当前已经不存在线上线程泄露影响其它作业的问题。


03性能优化

第三部分是性能优化。主要包括启动性能和调度性能等方面的改造。

3.1 Pod启动速度优化

下面是对Pod性能启动的优化。

在迁移K8S的前期发现一些问题:作业提交到K8S集群经常引发提交超时。实时平台提交阶段默认一分钟超时,如果一分钟内无法启动JM,就会判定作业失败。提交超时主要有两个原因:

  • 镜像拉取慢。Flink镜像大小超过3G,拉取镜像耗有时候会超过四五分钟。做的改进是:
  • 镜像分层。增量镜像为400MB,base镜像基本不变,只需要拉取增量镜像,就可以降低Flink拉取镜像的耗时。
  • 镜像预热。将Flink的镜像预热到机器上,在Flink启动时无需再拉取镜像。
  • Flink进程启动非常慢。无论JM或TM在Pod中启动都是通过脚本拉起,执行脚本的时间比较长,无法达到需求。启动慢的原因是Pod加载了很多环境变量。在K8S中默认开启ServiceLink,会将其它服务以环境变量的形式输入到Pod中,启动Pod后可以通过环境变量的方式访问到其它服务,就会导致一个Pod启动后有很多环境变量。在K8S中默认通过DNS服务访问,而不是通过环境变量方式,所以环境变量本身没有太大作用,也并不准确。在Host网络下真实服务的端口是在整个作业启动后才更新到REST Service中,最初的环境变量都默认为8081的端口,本身就是错误的。解决方案是默认禁用K8S中的ServiceLink。

经过以上优化,Pod的拉取或Flink进程启动耗时在2-3s,可以满足启动需求。


3.2 Flink作业启动速度优化

接着进一步对Flink On K8S做了性能优化,主要有四方面。

  1. 资源的预申请和动态申请。
  • Flink作业申请资源需要等待生成ExecutionGraph后调度时申请资源。优化是在作业启动ResourceManager后,立即申请资源,这样在作业调度申请时大多数资源已就绪。
  • 有时候申请资源时遇到部分TM申请较慢,所以又增加了动态申请。在第一次申请时比所需资源申请的更多,来覆盖节点慢的问题。在TM下RM注册足够的资源后就会拒绝其它的资源,或者取消其它资源的申请请求。
  1. 资源分发优化。SQL的依赖除了UDF都包含在Flink镜像中,所以减少了这部分耗时。UDF资源,在JM/TM启动时会异步下载这些资源,而不是等到作业提交后再进行。
  2. RPC性能优化。在Flink On YARN上作业启动提交一个task的耗时在2-3s,但是在K8S上耗时有十几秒甚至几十秒。提交慢的原因是提交过程中包含了一个注册metrics的阶段。注册metrics理论上也无需实时注册,因为指标采集是周期性的,所以可以延时注册。目前的优化是异步注册metrics,K8S上submitTask RPC性能基本与Yarn对齐。
  3. SQL编译结果缓存优化。在B站内部有大概95%的作业都是SQL作业,这部分作业在所有启动操作中占比很大,原因是可能遇到一些故障节点的迁移等类似场景。优化是将SQL编译结果进行缓存。等之后作业重启时,就无需SQL编译的过程,可以加速Flink作业的启动。

通过以上的优化,Flink On Kubernetes作业启动90线在30-35s左右(不包含状态恢复部分)。


3.3 K8S资源调度优化

下面介绍调度方面的优化。

K8S调度的问题如图所示。发现有时候单个Flink作业很多TM集中调度到机器1上,而机器2和3 TM较少。如果该作业本身是一个负载高的作业,容易引起机器1的Load或IO变高。


3.4 K8S资源调度优化

优化是增加了一个反亲和调度。亲和性是一个Pod与另一个Pod亲和,容易调度在一起。反亲和是一个Pod不与另一个Pod进行调度。单个作业所有TM尽量在整个集群中进行平均分布,可以避免一些负载集中的问题。


3.5 灵活的资源定义与弹性

在B站内部,JM的资源配置由平台统一管理,TM的配置由用户设置,用户需要什么资源量都可以自己设置。

JM的资源优化:作业启动时SQL的编译消耗CPU资源的问题,需要调大资源去加速启动。当作业启动后,JM本身不真实运行Task,负载比较低。

在Flink ON Yarn上资源粒度粗,我们的场景中最少的资源粒是4G,容易引起资源浪费。该问题在K8S上比较好解决。

K8S本身支持资源的request和limit,request是资源申请的下限,limit是使用资源的上限。在整个作业启动时可以利用limit资源加速启动,在作业运行时按照request统计资源使用量,从而解决该问题。此外内部也有一些流量很小的作业,K8S支持设置它的资源到0.5。


3.6 其它优化

下面介绍其它方面的优化。

  1. 热扩缩。支持VPA和HPA,通过REST API请求方式来实现。也有一个专门模块warship可以根据作业指标对作业进行动态热扩缩。
  2. 单点恢复。有些场景对于断流非常敏感,同时可以容忍少量数据丢失,针对失败的TM进行单点恢复来实现作业的不断流。
  3. 优先级调度。让K8S感知Flink作业SLA级别,在资源紧张时优先满足高优作业的资源申请。


04运维优化

最后介绍运维方面的优化。主要在两方面:

  1. 从YARN迁移到K8S过程的稳定性保障
  2. K8S的运维诊断改造


4.1 作业迁移

我们Flink作业从Yarn迁移到K8S上,主要使用了Flink Manager工具。它功能比较丰富,不局限于一些任务的迁移。如右图所示,有一些版本管理发布管理。

  1. 现在主要是任务队列的迁移,K8S的一个资源队列完全可以覆盖掉Yarn上的资源队列,就可以进行整个队列的迁移。迁移时会筛选出队列所有的作业,通过stop-with-savepoint或者超时下线再提交到K8S集群,再监控K8S作业的稳定性。目前监控作业的状态,核心是监控checkpoint状态。在Flink中很多Flink作业的异常都可以反映到checkpoint上,所以我们选取的指标是监控checkpoint的完成情况。
  2. 如果K8S资源不足以覆盖掉YARN的资源,则每次迁移部分机器,例如YARN上机器迁移到K8S上,会将Yarn上的label移除,节点不可调度后筛选机器上所有运行的作业,通过内部的黑名单重启迁移机器上的作业,迁移掉对应的TM container。


4.2 作业故障排查

下面介绍其他两个问题。

  1. 在K8S中,Flink作业可能有着成百上千的Pod。一旦作业失败,整个故障现场会随着Pod失败而消失,会导致故障诊断难以进行。在最初的版本中将日志采集模块放入Pod中,如果Pod失败退出,日志采集不一定会实时采集退出的日志,会导致关键的日志无法被显示。
  2. K8S Event信息有利于我们排查问题。

我们将Pod日志目录挂载到宿主机的特定目录上,这样Flink日志不会随着Pod失败而消失。K8S负责清理日志,Pod日志失败超过一天后就会被删除。K8S也会暴露出Pod Event事件,方便故障排查。


4.3 Core Dump改造

对于Linux Core Dump的改造如下。在内部有一些作业在UDF中有JNI调用,在JNI异常时会将TM Crash掉,导致作业失败。通常作业失败的原因是网络栈的异常,导致很难排查。现在做的改造是:

  1. 增加ErrorFile文件,记录整个java进程崩溃时的一些详细信息。有了ErrorFile后可以明确作业TM Crash是因为JNI异常引起的。但是只有ErrorFile无法帮助用户进行问题修复。
  2. 增加了Linux Core Dump文件。Linux Core Dump在Linux进程崩溃时默认写出进程内存镜像,生成Core Dump文件。在默认Flink模式下,进程是在Pod的前台启动,无法生成Core Dump文件。我们将Flink进程放到后台启动,这样Flink Crash后,Pod暂时不会退出。前台增加了一个启动脚本,检测Flink进程的状态,如果Flink退出,会将Pod退出。优势是可以生成Linux Core Dump文件,做一些问题的排查。通过该方式帮助用户解决了一些JNI问题。
  3. 增加了一些Core Dump事件告警。如果作业失败,可以导出JNI异常,告警模块可以直接定位到作业失败的原因。
  4. 解析整个Core Dump文件依赖Flink运行时的镜像。Core Dump文件的导出来源Flink镜像。


05未来展望

第五部分是未来展望。

未来计划在以下三个方面进行探索。

  1. 多机房容灾。配合公司的多机房高可用的规划,提升Flink在整个机房故障时的可用性。
  2. 负载均衡,通过Flink作业的资源画像,结合一些调度来实现单机负载均衡。
  3. 潮汐混部。目前Flink与Spark进行了混部,未来计划利用Flink作业的潮汐特性与其它作业进行混部,进一步提升Flink 的资源利用率。

   

活动推荐


阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:

新用户复制下方链接或者扫描二维码即可0元免费试用 Flink + Paimon

了解活动详情:https://free.aliyun.com/?pipCode=sc




来源  |  Apache Flink公众号

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
27天前
|
Kubernetes Cloud Native 安全
云原生机密计算新范式 PeerPods技术方案在阿里云上的落地和实践
PeerPods 技术价值已在阿里云实际场景中深度落地。
|
3月前
|
运维 Cloud Native 测试技术
极氪汽车云原生架构落地实践
随着极氪数字业务的飞速发展,背后的 IT 技术也在不断更新迭代。极氪极为重视客户对服务的体验,并将系统稳定性、业务功能的迭代效率、问题的快速定位和解决视为构建核心竞争力的基石。
|
1月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
123 9
Flink在B站的大规模云原生实践
|
2月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
167 9
网易游戏 Flink 云原生实践
|
7月前
|
Cloud Native 持续交付 开发者
云原生技术在现代企业中的应用与实践####
本文深入探讨了云原生技术的核心概念及其在现代企业IT架构转型中的关键作用,通过具体案例分析展示了云原生如何促进企业的敏捷开发、高效运维及成本优化。不同于传统摘要仅概述内容,本部分旨在激发读者对云原生领域的兴趣,强调其在加速数字化转型过程中的不可或缺性,为后续详细论述奠定基础。 ####
|
4月前
|
存储 人工智能 缓存
AI变革药物研发:深势科技的云原生实践之路
阿里云助力深势科技推出创新的玻尔Bohrium®科研云平台和Hermite®药物计算设计平台,并持续完善。这两项先进的工业设计与仿真基础设施成果通过AI技术赋能科学研究和工业研发,不仅大幅缩短了药物研发周期,降低了成本,还显著提高了研发成功率,为生物医药行业带来了前所未有的变革,这是AI for Science领域的重大突破。
316 38
|
3月前
|
Cloud Native Serverless 流计算
云原生时代的应用架构演进:从微服务到 Serverless 的阿里云实践
云原生技术正重塑企业数字化转型路径。阿里云作为亚太领先云服务商,提供完整云原生产品矩阵:容器服务ACK优化启动速度与镜像分发效率;MSE微服务引擎保障高可用性;ASM服务网格降低资源消耗;函数计算FC突破冷启动瓶颈;SAE重新定义PaaS边界;PolarDB数据库实现存储计算分离;DataWorks简化数据湖构建;Flink实时计算助力风控系统。这些技术已在多行业落地,推动效率提升与商业模式创新,助力企业在数字化浪潮中占据先机。
240 12
|
4月前
|
存储 人工智能 缓存
AI变革药物研发:深势科技的云原生实践之路
近日,阿里云助力深势科技推出创新的玻尔Bohrium®科研云平台和Hermite®药物计算设计平台,并持续完善。
AI变革药物研发:深势科技的云原生实践之路
|
4月前
|
存储 弹性计算 Cloud Native
云原生成本精细化管理实践:企迈科技的成本中心建设之路
企迈实施成本中心建设的项目核心目标不仅是实现云资源的优化配置,还要为管理层提供清晰、实时的成本数据分析,帮助管理层做出更加精准的决策。通过精细化的云成本管控,逐步实现成本降低、资源合理分配和更加高效的云产品使用。
云原生成本精细化管理实践:企迈科技的成本中心建设之路