Flink在快手的应用实践与技术演进之路

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Flink 在快手应用场景与规模 1. Flink 在快手应用场景 快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。

Flink在快手应用场景

image

快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。
image

Flink 在快手应用的类别主要分为三大类:

  • 80% 统计监控:实时统计,包括各项数据的指标,监控项报警,用于辅助业务进行实时分析和监控;
  • 15% 数据处理:对数据的清洗、拆分、Join 等逻辑处理,例如大 Topic 的数据拆分、清洗;
  • 5% 数据处理:实时业务处理,针对特定业务逻辑的实时处理,例如实时调度;

image

Flink在快手应用的典型场景包括:

  • 快手是分享短视频跟直播的平台,快手短视频、直播的质量监控是通过 Flink 进行实时统计,比如直播观众端、主播端的播放量、卡顿率、开播失败率等跟直播质量相关的多种监控指标;
  • 用户增长分析,实时统计各投放渠道拉新情况,根据效果实时调整各渠道的投放量;
  • 实时数据处理,广告展现流、点击流实时 Join,客户端日志的拆分等;
  • 直播 CDN 调度,实时监控各 CDN 厂商质量,通过 Flink 实时训练调整各个 CDN 厂商流量配比;

Flink集群规模

image

快手目前集群规模有1500台左右,作业数量大约是500左右,日处理条目数总共有1.7万亿,峰值处理条目数大约是3.7千万。集群部署都是On Yarn模式,分为离线集群和实时集群两类集群,其中离线集群混合部署,机器通过标签进行物理隔离,实时集群是 Flink 专用集群,针对隔离性、稳定性要求极高的业务部署。

快手Flink技术演进

快手 Flink 技术演进主要分为三部分:

  • 基于特定场景优化,包括 Interval Join 场景优化;
  • 稳定性改进,包括数据源控速、JobManager 稳定性、作业频繁失败;
  • 平台建设;

场景优化

Interval Join应用场景

image

Interval Join 在快手的一个应用场景是广告展现点击流实时 Join 场景:打开快手 App 可能会收到广告服务推荐的广告视频,用户有时会点击展现的广告视频。这样在后端形成两份数据流,一份是广告展现日志,一份是客户端点击日志。这两份数据需进行实时 Join,将 Join 结果作为样本数据用于模型训练,训练出的模型会被推送到线上的广告服务。

该场景下展现以后 20 分钟的点击被认为是有效点击,实时 Join 逻辑则是点击数据 Join 过去 20 分钟展现。其中,展现流的数据量相对比较大,20 分钟数据在 1 TB 以上。最初实时 Join 过程是业务自己实现,通过 Redis 缓存广告展现日志,Kafka 延迟消费客户端点击日志实现 Join 逻辑,该方式缺点是实时性不高,并且随着业务增长需要堆积更多机器,运维成本非常高。基于 Flink 使用 Interval Join 完美契合此场景,并且实时性高,能够实时输出 Join 后的结果数据,对业务来说维护成本非常低,只需要维护一个 Flink 作业即可。

Interval Join场景优化

image

Interval Join原理:

Flink 实现Interval join的原理:两条流数据缓存在内部State中,任意一数据到达,获取对面流相应时间范围数据,执行 joinFunction进行Join。随着时间的推进,State中两条流相应时间范围的数据会被清理。

在前面提到的广告应用场景Join过去20分钟数据,假设两个流的数据完全有序到达,Stream A作为展现流缓存过去20分钟数据,Stream B 作为点击流每来一条数据到对面Join过去20分钟数据即可。

Flink 实现 Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB)

     .between(Time.minutes(0),Time.minutes(20))
     .process(joinFunction)

状态存储策略选择

image

关于状态存储策略选择,生产环境状态存储Backend有两种方式:

  • FsStateBackend:State存储在内存,Checkpoint时持久化到HDFS;
  • RocksDBStateBackend:State存储在RocksDB 实例,可增量Checkpoint,适合超大State。在广告场景下展现流20分钟数据有1 TB以上,从节省内存等方面综合考虑,快手最终选择的是RocksDBStateBackend;

在 Interval join场景下,RocksDB状态存储方式是将两个流的数据存在两个Column Family里,RowKey根据keyGroupId+joinKey+ts方式组织。

RocksDB访问性能问题

image

Flink作业上线遇到的第一个问题是RocksDB访问性能问题,表现为:

  • 作业在运行一段时间之后出现反压,吞吐下降;
  • 通过 Jstack 发现程序逻辑频繁处于 RocksDB get 请求处;
  • 通过 Top 发现存在单线程 CPU 持续被打满;

进一步对问题分析,发现:该场景下,Flink 内部基于 RocksDB State 状态存储时,获取某个 Join key 值某段范围的数据,是通过前缀扫描的方式获取某个 Join key 前缀的 entries 集合,然后再判断哪些数据在相应的时间范围内。前缀扫描的方式会导致扫描大量的无效数据,扫描的数据大多缓存在 PageCache 中,在 Decode 数据判断数据是否为 Delete 时,消耗大量 CPU。

以上图场景为例,蓝色部分为目标数据,红色部分为上下边界之外的数据,前缀扫描时会过多扫描红色部分无用数据,在对该大量无效数据做处理时,将单线程 CPU 消耗尽。

针对RocksDB访问性能优化

image

快手在Interval join该场景下对RocksDB的访问方式做了以下优化:

  • 在Intervaljoin 场景下,是可以精确的确定需访问的数据边界范围。所以用全 Key 范围扫描代替前缀扫描,精确拼出查询上下边界 Full Key 即 keyGroupId+joinKey+ts[lower,upper];
  • 范围查询 RocksDB ,可以更加精确 Seek 到上下边界,避免无效数据扫描和校验;

优化后的效果:P99 查询时延性能提升 10 倍,即 nextKey 获取 RocksDB 一条数据, P99 时延由 1000 毫秒到 100 毫秒以内。作业吞吐反压问题进而得到解决。

RocksDB 磁盘压力问题

image

Flink 作业上线遇到的第二个问题是随着业务的增长, RocksDB 所在磁盘压力即将达到上限,高峰时磁盘 util 达到 90%,写吞吐在 150 MB/s。详细分析发现,该问题是由以下几个原因叠加导致:

  • Flink机器选型为计算型,大内存、单块HDD盘,在集群规模不是很大的情况下,单个机器会有4-5个该作业Container,同时使用一块 HDD盘;
  • RocksDB后台会频繁进行Compaction有写放大情况,同时Checkpoint也在写磁盘;

针对RocksDB磁盘压力,快手内部做了以下优化:

  • 针对RocksDB参数进行调优,目的是减少Compaction IO量。优化后IO总量有一半左右的下降;
  • 为更加方便的调整RocksDB参数,在 Flink 框架层新增 Large State RocksDB配置套餐。同时支持 RocksDBStateBackend 自定义配置各种RocksDB参数;

未来计划,考虑将State用共享存储的方式存储,进一步做到减少IO总量,并且快速Checkpoint和恢复。

稳定性改进

image

首先介绍下视频质量监控调度应用背景,有多个Kafka Topic存储短视频、直播相关质量日志,包括短视频上传/下载、直播观众端日志,主播端上报日志等。Flink Job读取相应Topic数据实时统计各类指标,包括播放量、卡顿率、黑屏率以及开播失败率等。指标数据会存到 Druid提供后续相应的报警监控以及多维度的指标分析。同时还有一条流是进行直播CDN调度,也是通过Flink Job实时训练、调整各 CDN厂商的流量配比。

以上Kafka Topic数据会同时落一份到Hadoop集群,用于离线补数据。实时计算跟离线补数据的过程共用同一份Flink代码,针对不同的数据源,分别读 Kafka数据或HDFS数据。

数据源控速

image

视频应用场景下遇到的问题是:作业 DAG 比较复杂,同时从多个 Topic 读取数据。一旦作业异常,作业失败从较早状态恢复,需要读取部分历史数据。此时,不同 Source 并发读取数据速度不可控,会导致 Window 类算子 State 堆积、作业性能变差,最终导致作业恢复失败。另外,离线补数据,从不同 HDFS 文件读数据同样会遇到读取数据不可控问题。在此之前,实时场景下临时解决办法是重置 GroupID 丢弃历史数据,使得从最新位置开始消费。

针对该问题我们希望从源头控制多个 Source 并发读取速度,所以设计了从 Source 源控速的策略。

Source 控速策略
image

Source 控速策略是 :

  • SourceTask共享速度状态提供给JobManager;
  • JobManager引入SourceCoordinator,该Coordinator拥有全局速度视角,制定相应的策略,并将限速策略下发给 SourceTask;
  • SourceTask根据JobManager下发的速度调节信息执行相应控速逻辑
  • 一个小细节是 DAG 图有子图的话, 不同子图 Source 源之间互相不影响;

Source控速策略详细细节

image

SourceTask共享状态

  • SourceTask定期汇报状态给JobManager,默认10s间隔;
  • 汇报内容为;

协调中心SourceCoordinator

  • 限速阈值:最快并发Watermark - 最慢并发 Watermark > ∆t(默认5分钟)。只要在达到限速阈值情况下,才进行限速策略制定;
  • 全局预测:各并发 targetWatermark=base+speed*time;Coordinator 先进行全局预测,预测各并发接下来时间间隔能运行到的 Watermark位置;
  • 全局决策:targetWatermark = 预测最慢 Watermark+∆t/2;Coordinator 根据全局预测结果,取预测最慢并发的 Watermark 值再浮动一个范围作为下个周期全局限速决策的目标值;
  • 限速信息下发:。将全局决策的信息下发给所有的 Source task,限速信息包括下一个目标的时间和目标的Watermark位置;

以上图为例,A时刻,4个并发分别到达如图所示位置,为A+interval的时刻做预测,图中蓝色虚线为预测各并发能够到达的位置,选择最慢的并发的Watermark位置,浮动范围值为Watermark + ∆t/2 的时间,图中鲜红色虚线部分为限速的目标 Watermark,以此作为全局决策发给下游Task。

image

SourceTask 限速控制

  • SourceTask 获取到限速信息后,进行限速控制;
  • 以 KafkaSource 为例,KafkaFetcher 获取数据时,根据限速信息 Check 当前进度,确定是否需要限速等待;

该方案中,还有一些其他考虑,例如:

  • 时间属性:只针对 EventTime 情况下进行限速执行;
  • 开关控制:支持作业开关控制是否开启 Source 限速策略;
  • DAG 子图 Source 源之间互相不影响;
  • 是否会影响 CheckPoint Barrier 下发;
  • 数据源发送速度不恒定,Watermark 突变情况;

Source 控速结果

image

拿线上作业,使用 Kafka 从最早位置(2 days ago)开始消费。如上图,不限速情况下 State 持续增大,最终作业挂掉。使用限速策略后,最开始 State 有缓慢上升,但是 State 大小可控,最终能平稳追上最新数据,并 State 持续在 40 G 左右。

JobManager稳定性

image

关于JobManager稳定性,遇到了两类Case,表现均为:JobManager在大并发作业场景WebUI卡顿明显,作业调度会超时。进一步分析了两种场景下的问题原因:

  • 场景一,JobManager 内存压力大问题。JobManager 需要控制删除已完成的 Checkpoint 在 HDFS 上的路径。在 NameNode 压力大时,Completed CheckPoint 路径删除慢,导致 CheckPoint Path 在内存中堆积。原来删除某一次 Checkpoint 路径策略为:每删除目录下一个文件,需 List 该目录判断是否为空,如为空将目录删除。在大的 Checkpoint 路径下, List 目录操作为代价较大的操作。针对该逻辑进行优化,删除文件时直接调用 HDFS delete(path,false) 操作,语义保持一致,并且开销小;
  • 场景二,该Case发生在Yarn Cgroup功能上线之后,JobManager G1 GC过程变慢导致阻塞应用线程。AppMaster申请CPU个数硬编码为1,在上线Cgroup之后可用的CPU资源受到限制。解决该问题的方法为,支持AppMaster申请CPU个数参数化配置;

作业频繁失败

image

机器故障造成作业频繁失败,具体的场景也有两种:

  • 场景一:磁盘问题导致作业持续调度失败。磁盘出问题导致一些 Buffer 文件找不到。又因为 TaskManager 不感知磁盘健康状况,会频繁调度作业到该 TaskManager,作业频繁失败;
  • 场景二:某台机器有问题导致 TaskManager 在某台机器上频繁出 Core,陆续分配新的 TaskManager 到这台机器上,导致作业频繁失败;

针对机器故障问题解决方法:

  • 针对磁盘问题,TaskManager 增加 DiskChecker 磁盘健康检查,发现磁盘有问题 TaskManager 自动退出;
  • 针对有些机器频繁出现 TaskManager 出现问题,根据一定的策略将有问题机器加到黑名单中,然后通过软黑名单机制,告知 Yarn 尽量不要调度 Container 到该机器;

平台化建设

平台建设:

image

快手的平台化建设主要体现在青藤作业托管平台。通过该平台可进行作业操作、作业管理以及作业详情查看等。作业操作包括提交、停止作业。作业管理包括管理作业存活、性能报警,自动拉起配置等;详情查看,包括查看作业的各类 Metric 等。

上图为青藤作业托管平台的一些操作界面。

问题定位流程优化:

image

我们也经常需要给业务分析作业性能问题,帮助业务 debug 一些问题,过程相对繁琐。所以该部分我们也做了很多工作,尽量提供更多的信息给业务,方便业务自主分析定位问题。

  • 将所有Metric入Druid,通过Superset可从各个维度分析作业各项指标;
  • 针对Flink的WebUI做了一些完善,支持Web实时打印jstack,Web DAG为各Vertex增加序号,Subtask信息中增加各并发 SubtaskId;
  • 丰富异常信息提示,针对机器宕机等特定场景信息进行明确提示;
  • 新增各种Metric;

未来计划

未来规划主要分为两个部分:

  • 目前在建设的 Flink SQL 相关工作。因为 SQL 能够减少用户开发的成本,包括我们现在也在对接实时数仓的需求,所以 Flink SQL 是我们未来计划的重要部分之一;
  • 我们希望进行一些资源上的优化。目前业务在提作业时存在需求资源及并发预估不准确的情况,可能会过多申请资源导致资源浪费。另外如何提升整体集群资源的利用率问题,也是接下来需要探索的问题;

备注:本文转载自https://www.infoq.cn/article/sEMcN3uK-3jk9EBCiUuS

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
151 1
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
163 15
|
2月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
386 31
Apache Flink 流批融合技术介绍
|
21天前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
54 9
|
22天前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验
|
24天前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
50 4
|
13天前
|
数据采集 运维 搜索推荐
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
42 0
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
536 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
445 13
Flink CDC 在新能源制造业的实践
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
40 0