Flink在快手的应用实践与技术演进之路-阿里云开发者社区

开发者社区> 阿里云EMR> 正文
登录阅读全文

Flink在快手的应用实践与技术演进之路

简介: Flink 在快手应用场景与规模 1. Flink 在快手应用场景 快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。

Flink在快手应用场景

image

快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。
image

Flink 在快手应用的类别主要分为三大类:

  • 80% 统计监控:实时统计,包括各项数据的指标,监控项报警,用于辅助业务进行实时分析和监控;
  • 15% 数据处理:对数据的清洗、拆分、Join 等逻辑处理,例如大 Topic 的数据拆分、清洗;
  • 5% 数据处理:实时业务处理,针对特定业务逻辑的实时处理,例如实时调度;

image

Flink在快手应用的典型场景包括:

  • 快手是分享短视频跟直播的平台,快手短视频、直播的质量监控是通过 Flink 进行实时统计,比如直播观众端、主播端的播放量、卡顿率、开播失败率等跟直播质量相关的多种监控指标;
  • 用户增长分析,实时统计各投放渠道拉新情况,根据效果实时调整各渠道的投放量;
  • 实时数据处理,广告展现流、点击流实时 Join,客户端日志的拆分等;
  • 直播 CDN 调度,实时监控各 CDN 厂商质量,通过 Flink 实时训练调整各个 CDN 厂商流量配比;

Flink集群规模

image

快手目前集群规模有1500台左右,作业数量大约是500左右,日处理条目数总共有1.7万亿,峰值处理条目数大约是3.7千万。集群部署都是On Yarn模式,分为离线集群和实时集群两类集群,其中离线集群混合部署,机器通过标签进行物理隔离,实时集群是 Flink 专用集群,针对隔离性、稳定性要求极高的业务部署。

快手Flink技术演进

快手 Flink 技术演进主要分为三部分:

  • 基于特定场景优化,包括 Interval Join 场景优化;
  • 稳定性改进,包括数据源控速、JobManager 稳定性、作业频繁失败;
  • 平台建设;

场景优化

Interval Join应用场景

image

Interval Join 在快手的一个应用场景是广告展现点击流实时 Join 场景:打开快手 App 可能会收到广告服务推荐的广告视频,用户有时会点击展现的广告视频。这样在后端形成两份数据流,一份是广告展现日志,一份是客户端点击日志。这两份数据需进行实时 Join,将 Join 结果作为样本数据用于模型训练,训练出的模型会被推送到线上的广告服务。

该场景下展现以后 20 分钟的点击被认为是有效点击,实时 Join 逻辑则是点击数据 Join 过去 20 分钟展现。其中,展现流的数据量相对比较大,20 分钟数据在 1 TB 以上。最初实时 Join 过程是业务自己实现,通过 Redis 缓存广告展现日志,Kafka 延迟消费客户端点击日志实现 Join 逻辑,该方式缺点是实时性不高,并且随着业务增长需要堆积更多机器,运维成本非常高。基于 Flink 使用 Interval Join 完美契合此场景,并且实时性高,能够实时输出 Join 后的结果数据,对业务来说维护成本非常低,只需要维护一个 Flink 作业即可。

Interval Join场景优化

image

Interval Join原理:

Flink 实现Interval join的原理:两条流数据缓存在内部State中,任意一数据到达,获取对面流相应时间范围数据,执行 joinFunction进行Join。随着时间的推进,State中两条流相应时间范围的数据会被清理。

在前面提到的广告应用场景Join过去20分钟数据,假设两个流的数据完全有序到达,Stream A作为展现流缓存过去20分钟数据,Stream B 作为点击流每来一条数据到对面Join过去20分钟数据即可。

Flink 实现 Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB)

     .between(Time.minutes(0),Time.minutes(20))
     .process(joinFunction)

状态存储策略选择

image

关于状态存储策略选择,生产环境状态存储Backend有两种方式:

  • FsStateBackend:State存储在内存,Checkpoint时持久化到HDFS;
  • RocksDBStateBackend:State存储在RocksDB 实例,可增量Checkpoint,适合超大State。在广告场景下展现流20分钟数据有1 TB以上,从节省内存等方面综合考虑,快手最终选择的是RocksDBStateBackend;

在 Interval join场景下,RocksDB状态存储方式是将两个流的数据存在两个Column Family里,RowKey根据keyGroupId+joinKey+ts方式组织。

RocksDB访问性能问题

image

Flink作业上线遇到的第一个问题是RocksDB访问性能问题,表现为:

  • 作业在运行一段时间之后出现反压,吞吐下降;
  • 通过 Jstack 发现程序逻辑频繁处于 RocksDB get 请求处;
  • 通过 Top 发现存在单线程 CPU 持续被打满;

进一步对问题分析,发现:该场景下,Flink 内部基于 RocksDB State 状态存储时,获取某个 Join key 值某段范围的数据,是通过前缀扫描的方式获取某个 Join key 前缀的 entries 集合,然后再判断哪些数据在相应的时间范围内。前缀扫描的方式会导致扫描大量的无效数据,扫描的数据大多缓存在 PageCache 中,在 Decode 数据判断数据是否为 Delete 时,消耗大量 CPU。

以上图场景为例,蓝色部分为目标数据,红色部分为上下边界之外的数据,前缀扫描时会过多扫描红色部分无用数据,在对该大量无效数据做处理时,将单线程 CPU 消耗尽。

针对RocksDB访问性能优化

image

快手在Interval join该场景下对RocksDB的访问方式做了以下优化:

  • 在Intervaljoin 场景下,是可以精确的确定需访问的数据边界范围。所以用全 Key 范围扫描代替前缀扫描,精确拼出查询上下边界 Full Key 即 keyGroupId+joinKey+ts[lower,upper];
  • 范围查询 RocksDB ,可以更加精确 Seek 到上下边界,避免无效数据扫描和校验;

优化后的效果:P99 查询时延性能提升 10 倍,即 nextKey 获取 RocksDB 一条数据, P99 时延由 1000 毫秒到 100 毫秒以内。作业吞吐反压问题进而得到解决。

RocksDB 磁盘压力问题

image

Flink 作业上线遇到的第二个问题是随着业务的增长, RocksDB 所在磁盘压力即将达到上限,高峰时磁盘 util 达到 90%,写吞吐在 150 MB/s。详细分析发现,该问题是由以下几个原因叠加导致:

  • Flink机器选型为计算型,大内存、单块HDD盘,在集群规模不是很大的情况下,单个机器会有4-5个该作业Container,同时使用一块 HDD盘;
  • RocksDB后台会频繁进行Compaction有写放大情况,同时Checkpoint也在写磁盘;

针对RocksDB磁盘压力,快手内部做了以下优化:

  • 针对RocksDB参数进行调优,目的是减少Compaction IO量。优化后IO总量有一半左右的下降;
  • 为更加方便的调整RocksDB参数,在 Flink 框架层新增 Large State RocksDB配置套餐。同时支持 RocksDBStateBackend 自定义配置各种RocksDB参数;

未来计划,考虑将State用共享存储的方式存储,进一步做到减少IO总量,并且快速Checkpoint和恢复。

稳定性改进

image

首先介绍下视频质量监控调度应用背景,有多个Kafka Topic存储短视频、直播相关质量日志,包括短视频上传/下载、直播观众端日志,主播端上报日志等。Flink Job读取相应Topic数据实时统计各类指标,包括播放量、卡顿率、黑屏率以及开播失败率等。指标数据会存到 Druid提供后续相应的报警监控以及多维度的指标分析。同时还有一条流是进行直播CDN调度,也是通过Flink Job实时训练、调整各 CDN厂商的流量配比。

以上Kafka Topic数据会同时落一份到Hadoop集群,用于离线补数据。实时计算跟离线补数据的过程共用同一份Flink代码,针对不同的数据源,分别读 Kafka数据或HDFS数据。

数据源控速

image

视频应用场景下遇到的问题是:作业 DAG 比较复杂,同时从多个 Topic 读取数据。一旦作业异常,作业失败从较早状态恢复,需要读取部分历史数据。此时,不同 Source 并发读取数据速度不可控,会导致 Window 类算子 State 堆积、作业性能变差,最终导致作业恢复失败。另外,离线补数据,从不同 HDFS 文件读数据同样会遇到读取数据不可控问题。在此之前,实时场景下临时解决办法是重置 GroupID 丢弃历史数据,使得从最新位置开始消费。

针对该问题我们希望从源头控制多个 Source 并发读取速度,所以设计了从 Source 源控速的策略。

Source 控速策略
image

Source 控速策略是 :

  • SourceTask共享速度状态提供给JobManager;
  • JobManager引入SourceCoordinator,该Coordinator拥有全局速度视角,制定相应的策略,并将限速策略下发给 SourceTask;
  • SourceTask根据JobManager下发的速度调节信息执行相应控速逻辑
  • 一个小细节是 DAG 图有子图的话, 不同子图 Source 源之间互相不影响;

Source控速策略详细细节

image

SourceTask共享状态

  • SourceTask定期汇报状态给JobManager,默认10s间隔;
  • 汇报内容为;

协调中心SourceCoordinator

  • 限速阈值:最快并发Watermark - 最慢并发 Watermark > ∆t(默认5分钟)。只要在达到限速阈值情况下,才进行限速策略制定;
  • 全局预测:各并发 targetWatermark=base+speed*time;Coordinator 先进行全局预测,预测各并发接下来时间间隔能运行到的 Watermark位置;
  • 全局决策:targetWatermark = 预测最慢 Watermark+∆t/2;Coordinator 根据全局预测结果,取预测最慢并发的 Watermark 值再浮动一个范围作为下个周期全局限速决策的目标值;
  • 限速信息下发:。将全局决策的信息下发给所有的 Source task,限速信息包括下一个目标的时间和目标的Watermark位置;

以上图为例,A时刻,4个并发分别到达如图所示位置,为A+interval的时刻做预测,图中蓝色虚线为预测各并发能够到达的位置,选择最慢的并发的Watermark位置,浮动范围值为Watermark + ∆t/2 的时间,图中鲜红色虚线部分为限速的目标 Watermark,以此作为全局决策发给下游Task。

image

SourceTask 限速控制

  • SourceTask 获取到限速信息后,进行限速控制;
  • 以 KafkaSource 为例,KafkaFetcher 获取数据时,根据限速信息 Check 当前进度,确定是否需要限速等待;

该方案中,还有一些其他考虑,例如:

  • 时间属性:只针对 EventTime 情况下进行限速执行;
  • 开关控制:支持作业开关控制是否开启 Source 限速策略;
  • DAG 子图 Source 源之间互相不影响;
  • 是否会影响 CheckPoint Barrier 下发;
  • 数据源发送速度不恒定,Watermark 突变情况;

Source 控速结果

image

拿线上作业,使用 Kafka 从最早位置(2 days ago)开始消费。如上图,不限速情况下 State 持续增大,最终作业挂掉。使用限速策略后,最开始 State 有缓慢上升,但是 State 大小可控,最终能平稳追上最新数据,并 State 持续在 40 G 左右。

JobManager稳定性

image

关于JobManager稳定性,遇到了两类Case,表现均为:JobManager在大并发作业场景WebUI卡顿明显,作业调度会超时。进一步分析了两种场景下的问题原因:

  • 场景一,JobManager 内存压力大问题。JobManager 需要控制删除已完成的 Checkpoint 在 HDFS 上的路径。在 NameNode 压力大时,Completed CheckPoint 路径删除慢,导致 CheckPoint Path 在内存中堆积。原来删除某一次 Checkpoint 路径策略为:每删除目录下一个文件,需 List 该目录判断是否为空,如为空将目录删除。在大的 Checkpoint 路径下, List 目录操作为代价较大的操作。针对该逻辑进行优化,删除文件时直接调用 HDFS delete(path,false) 操作,语义保持一致,并且开销小;
  • 场景二,该Case发生在Yarn Cgroup功能上线之后,JobManager G1 GC过程变慢导致阻塞应用线程。AppMaster申请CPU个数硬编码为1,在上线Cgroup之后可用的CPU资源受到限制。解决该问题的方法为,支持AppMaster申请CPU个数参数化配置;

作业频繁失败

image

机器故障造成作业频繁失败,具体的场景也有两种:

  • 场景一:磁盘问题导致作业持续调度失败。磁盘出问题导致一些 Buffer 文件找不到。又因为 TaskManager 不感知磁盘健康状况,会频繁调度作业到该 TaskManager,作业频繁失败;
  • 场景二:某台机器有问题导致 TaskManager 在某台机器上频繁出 Core,陆续分配新的 TaskManager 到这台机器上,导致作业频繁失败;

针对机器故障问题解决方法:

  • 针对磁盘问题,TaskManager 增加 DiskChecker 磁盘健康检查,发现磁盘有问题 TaskManager 自动退出;
  • 针对有些机器频繁出现 TaskManager 出现问题,根据一定的策略将有问题机器加到黑名单中,然后通过软黑名单机制,告知 Yarn 尽量不要调度 Container 到该机器;

平台化建设

平台建设:

image

快手的平台化建设主要体现在青藤作业托管平台。通过该平台可进行作业操作、作业管理以及作业详情查看等。作业操作包括提交、停止作业。作业管理包括管理作业存活、性能报警,自动拉起配置等;详情查看,包括查看作业的各类 Metric 等。

上图为青藤作业托管平台的一些操作界面。

问题定位流程优化:

image

我们也经常需要给业务分析作业性能问题,帮助业务 debug 一些问题,过程相对繁琐。所以该部分我们也做了很多工作,尽量提供更多的信息给业务,方便业务自主分析定位问题。

  • 将所有Metric入Druid,通过Superset可从各个维度分析作业各项指标;
  • 针对Flink的WebUI做了一些完善,支持Web实时打印jstack,Web DAG为各Vertex增加序号,Subtask信息中增加各并发 SubtaskId;
  • 丰富异常信息提示,针对机器宕机等特定场景信息进行明确提示;
  • 新增各种Metric;

未来计划

未来规划主要分为两个部分:

  • 目前在建设的 Flink SQL 相关工作。因为 SQL 能够减少用户开发的成本,包括我们现在也在对接实时数仓的需求,所以 Flink SQL 是我们未来计划的重要部分之一;
  • 我们希望进行一些资源上的优化。目前业务在提作业时存在需求资源及并发预估不准确的情况,可能会过多申请资源导致资源浪费。另外如何提升整体集群资源的利用率问题,也是接下来需要探索的问题;

备注:本文转载自https://www.infoq.cn/article/sEMcN3uK-3jk9EBCiUuS

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
+ 订阅

阿里巴巴开源大数据技术团队成立阿里云EMR技术圈, 每周推送前沿技术文章,直播分享经典案例、在线答疑,营造纯粹的开源大数据氛围,欢迎加入!加入钉钉群聊阿里云E-MapReduce交流2群,点击进入查看详情 https://qr.dingtalk.com/action/joingroup?code=v1,k1,cNBcqHn4TvG0iHpN3cSc1B86D1831SGMdvGu7PW+sm4=&_dt_no_comment=1&origin=11

官方博客
最新文章
相关文章
官网链接