领英如何应对Apache Spark的Scalability挑战

简介: 在集群计算引擎使用率快速增长的过程当中,会面对多维度的计算基础架构规模扩展性的挑战。同时由于Spark团队直接与Spark用户打交道,如何提升Spark用户生产力,避免“用户支持陷阱”,一直是较为头疼的问题。本次直播将由领英Spark团队软件工程师沈旻和林致远为您介绍,领英Spark 生态系统,构建多元化Spark 生态系统过程中遇到的挑战,如何提升Spark用户生成力以及如何优化Spark基础计算架构。

演讲嘉宾简介:
沈旻,领英Spark团队软件工程师,技术负责人,伊利诺伊芝加哥分校计算机专业博士学位。
林致远,领英Spark团队软件工程师,卡耐基梅隆大学硕士学位,专攻分布式系统方向。

以下内容根据演讲视频以及PPT整理而成。

点击链接观看精彩回放:https://developer.aliyun.com/live/43189

本次分享主要围绕以下四个方面:
一、Overview of Spark Ecosystem @ LinkedIn
二、Scaling challenges we have experienced
三、Solutions to scale our Spark users’productivity
四、Solutions to scale Spark compute infrastructure

一、Overview of Spark Ecosystem @ LinkedIn
本着协助成就职场人士,在职场上事半功倍的初衷与使命,领英平台已演变成了数字化的全球经济图谱,图谱中所蕴含的信息不仅为各种平台产品提供了所需的数据,也提供了数字化洞悉全球经济的途径。在这样一张庞大的图谱中分析数据获得insights才能帮助领英平台中的职场人士。那么大规模的数据平台必不可少,因此Apache Spark成为了领英的主要计算引擎。

image.png

领英Spark生态系统有两个特别显著的特性,一方面是一个大规模计算基础架构,在Hadoop平台中领英使用了上万个节点,包括近100万的vcores,以及2+PB的内存。每日大概有3万个Spark应用在集群中运行,这些Spark应用程序占用了70%的计算资源,每日shuffle超过5PB的数据。计算基础架构增长也非常迅速,仅仅2019年平均就增长了3倍以上。

image.png

领英的Spark是一个多元化的生态系统,Spark用户通过不同的方式在集群中使用Spark,要么通过Azkaban获得预定的流程,或者通过Jupyter notebook运行交互式查询。在领英,上千万用户通过Spark丰富的API开发各种大数据应用。大约60%的应用程序是Spark SQL应用,通过丰富的大数据应用,在领英涵盖了多种应用场景,包括人工智能、数据分析、A/B Testing、Data Warehouse、指标报告等等。

image.png

二、Scaling challenges we have experienced

面临挑战
快速增长的大规模基础架构所支持的多元化生态系统给领英带来了不小的难题。在扩展Spark基础架构,赋能用户,高效开发等方面遇到了不同维度的挑战。首先在资源管理方面,在上千用户的集群中管理用户,并满足不同团队计算资源的需求会带来不少集群运维的挑战。在此之上,计算引擎本身也面临这更高扩展性的问题,计算引擎快速增长意味着需要可扩展的基础架构保证稳定性,否则将大大影响用户使用的便捷性和满意度,同时会带来技术问题且会增加技术运维的开销。在用户生产力方面,当Spark使用量增加时,用户支持问题也会接踵而至,为了确保内部Spark用户的生产力,团队中更多的人力也会自动的转移到用户支持的问题上。但这同时意味着有限的团队资源无法投入到计算基础架构的改进上,而这本身又会带来更多用户支持问题。这样的恶性循环会形成阻碍团队发展的“用户支持陷阱”。

image.png

解决方案
领英展开了系列的动作来解决这些挑战。在资源管理层面,工作中心是让运维团队从集群管理工作中解脱出来,创建了以业务为导向的集群资源队列结构,使得各个部门自己管理自己的资源队列。此外还优化了集成资源调度器,帮助用户弹性管理资源,将集群中的空闲资源以弹性资源的方式分配给最需要的队列。通过这些解决方案,资源管理很大程度上变成了自动化或用户自我管理的方式。在计算引擎层面,领英花了很多精力优化Spark Shuffle,Spark Shuffle是最大的规模扩展瓶颈。领英还尝试了多种SQL优化技术,SQL优化有助于减少同等Spark应用数量下的计算工作量,从而进一步缓解在快速增长的规模下计算引擎所面临的压力。在用户生产力方面,领英的目标是拜托用户支持的陷阱,尽量通过自动化的Spark系统回答最常见的用户问题,分别是为什么我的Spark运行失败了,为什么我的Spark运行很慢,以及如何让它运行的更快。下面分别从用户生产力及Spark Shuffle方面展开详细的介绍。

image.png

三、Solutions to scale our Spark users’ productivity

提升Spark用户生产力
提升用户生产力,并帮助用户理解Spark应用的各个方面对于团队工作至关重要。鉴于Spark是非常复杂的计算引擎,对于Spark用户而言,调试和调参往往都非常繁杂。由于Spark团队资源相对有限,考虑到领英Spark规模非常庞大,如何更好的提供用户支持也是一个极大的挑战。在Spark中运行时,错误可能会出现在任何地方,用户至少需要很多步骤才能获取到相关日志,寻找出错原因,有时即使找到出错日志但想找到根本原因也不是很容易的事情。而且Spark用户花费了很多功夫终于调试好了,但运行时还是有很多不尽人意的地方,调整应用性能瓶颈,再提升性能也是一项很头疼的工作。针对用户的痛点,领英开发了一套自动化的解决方案,帮助用户查找用户出错的原因,分析并查找应用性能瓶颈,并提供各种调参建议。借助这些解决方案,可以大大提高用户生产力,同时减轻Spark团队在用户支持上的负担。

image.png

第一类问题:为什么我的Spark应用失败了?

针对第一类问题,领英开发了一套自动出错分析工具,作为领英的Spark用户,大家可以在工作流失败的情况下,到工作流管理器Azkaban上找出错原因,免去了繁琐的获取相关日志,查询出错原因的步骤。用户只需要点击异常选项卡就能直观的找到所有相关异常,同时也能看到应用运行失败的根本原因。如下图中,出错原因是内存不足,用户也可以点击链接查看完整日志。

image.png

此外,领英还做了一系列出错原因分析可视化仪表,提供了各种原因的占比,可以为Spark用户带来更多帮助。如下图,可以发现最主要的出错类型是输入数据的缺失,以及数据访问权限问题。通过这些信息,团队可以更方便的知道其故障原因所在,从而采取相应措施。

image.png

自动故障分析可以时运维团队收益。下图是集群上各种错误知识趋势,包括网络问题和数据Shuffle问题等等。这种趋势监控对集群健康至关重要,有助于提早发现异常行为,识别集群中的潜在问题。

image.png

第二类问题:如何找到运行时的性能瓶颈?

针对第二类问题,领英为此开发了性能分析工具GridBench,它可以通过各种报告帮助用户理解性能指标,可以对同一个Spark应用多次运行后的结果自动分析,从而发现性能瓶颈点。GridBench也可以作为很好的衡量工具,帮助用户了解存储和计算模块的性能指标。下图中展示了GridBench针对某个应用做出的性能比较报告,通过对比两组不同时间下运行间的之间执行记录。GridBench可以确定应用性能是否有变化,可以发现Executer CPU Time有了明显的提升,直观的告诉用户性能瓶颈所在,在优化性能时更加的有针对性。

image.png

第三类问题:如何调参,使得应用运行的更快?

针对第三类问题,领英为用户提供了自动化参数调优建议。通过一系列预定义调参方案,自动化检查应用配置,资源设置等等,给出对应的建议。如下图,某个调优方案显示的是红色,表明有进一步优化的空间,如果显示绿色表示参数设置已经比较合理。下图中应用的内存设置太高,建议将其设置为较低的值,避免资源浪费。

image.png

解决方案设计框架
上述解决方案背后的设计框架中,各种Spark应用指标是框架核心元素,追踪集群里运行的每个指标,将指标汇总成一个数据集,数据集会被前面提到的自动化错误分析,性能分析及调参建议工具使用到。在设计框架中,用Spark History Server获取所需要的数据,它是领英Spark生态系统中重要的一环,为所有Spark应用提供历史日志记录,通过网页和Rest API的方式提供Spark 应用的详细数据信息。再通过Metric API获取指标数据。

image.png

Spark History Server
当日均Spark应用数据达到上万个时,通过Spark History Server获取每个指标数据,还是遇到了功能扩展问题。一方面在集群高峰时段,单位时间内结束的Spark应用数量比较大,Spark History Server不能很好的处理大量的并发请求。其次,Spark History Server在解析历史日志,提取较大的日志文件时会非常耗时,影响应用指标的及时性。为了解决并发请求问题,设计了分布式的Spark History Server,架构中包含一个proxy server和多个worker server,通过使用多台服务器可以很好的横向扩展。为了解决第二个问题,设计了增量Spark History Server解析(Incremental Parsing)。一般情况下,在Spark应用运行结束后才解析历史日志,Incremental Parsing可以在运行时开始解析,一点点的增量解析日志文件。当Spark应用结束后,Spark History Server Incremental Parsing可以在很短的时间内提供所需要的指标数据。目前大约只需要20秒,Spark History Server Incremental Parsing就可以完成对99%Spark应用的日志解析。

image.png

通过对Spark History Server扩展性和实时性的提升,得以以较低延迟提供所有Spark应用指标的数据。为了驱动各种用户生产力数据,领英搭建了一套基于Kafka 和samza的Spark Tracking Service。基于Kafka 和samza是领英开源的流处理系统,Spark Tracking Service首先会读取来自集群Resource Manager的数据流,获取运行结束时的Spark应用ID,查询Spark History Server 以获取每个Spark应用的数据,在对原始数据处理后Spark Tracking Service会进一步解析用户所需要的指标数据。由此,前面提到的自动故障分析、性能分析及调参工具就有了数据来源。

image.png

四、Solutions to scale Spark compute infrastructure

Spark Shuffle Service
有了提升用户生产力的各种工具之后,Spark团队可以更多的投入的优化计算引擎之上。Spark本身是一个复杂的系统,应该首先改进哪个组件呢?随着Spark在领英内部使用率的快速增长,Spark Shuffle Service成为了最先扩展瓶颈的的Spark组件之一。领英使用了External Spark Shuffle Service管理Shuffle文件,启用Spark动态资源分配功能,这种配置对多租户集群中Spark应用间的公平资源共享至关重要。在这样的部署中,集群中的每个计算节点都将部署一个Spark Shuffle Service,每个Spark Executer在启动时会和本地的Spark Shuffle Service对接,并提供注册信息。之后Spark Executer中Shuffle Map Tasks 会生成Shuffle文件,每个文件都包含对应不同Shuffle分区的Shuffle Block,Shuffle文件被External Spark Shuffle Service统一管理。当Shuffle Reducer Tasks开始运行时,都会从远程的Shuffle Service当中获取相应的Shuffle Block。在繁忙的生成集群当中,单个Shuffle Service可以轻易的接收到数千个Shuffle并发连接,这些连接来自数十个应用中的Shuffle Reducer Tasks。由于 Spark Shuffle Service共享性质,在大规模部署应用服务时遇到了很多问题。

image.png

Spark Shuffle Service问题
首先是Shuffle可靠性问题,在生成集群当中,在集群高峰时段Reducer Tasks经常无法与Shuffle进行连接,连接失败将导致Shuffle Block的获取失败。这种问题导致工作流中的SLA无法满足,甚至运行失败。在此之外,还遇到了Shuffle效率问题,在集群当中,Shuffle文件存储在硬盘之上,由于Reducer Tasks请求陆续发出,Shuffle Service也将访问数据,如果Shuffle Block大小很小,那么Shuffle Service生成的少数据随机获取操作将严重硬盘的数据吞吐量,从而延长Shuffle等待时间。第三个问题是Shuffle规模扩展性问题,由于Shuffle Service的共享属性,一个需要Shuffle很多小Blocks的应用,在获取Shuffle Block时很容易对Shuffle Service造成过大压力,导致性能的下降。这不仅影响对Shuffle不友好的应用,还会影响共享同一个Shuffle Service的相邻应用。对于这些应用而言,调整Shuffle Block并不容易,这种现象发生时也会导致其它正常应用运行时间的延长。

image.png

下图很直观的展示了小Shuffle Block带来的问题,图中采用了5000个Shuffle Reduce Stage,并在图中展示了每个Stage平均Shuffle Block的大小,以及每个任务的Shuffle等待时间。数据来源是领英2019年生产集群当中所运行的Spark应用。可以发现,经历了较长Shuffle等待时间的Stage,大多数也是应用了小Shuffle Block Stage。

image.png

Stage1:提升Shuffle Service可靠性
为了解决这些问题,领英分了三个阶段来系统性的解决集群中的Spark Shuffle组件。第一阶段就是提升Shuffle Service在集群高峰时段的可靠性。通过前面介绍的集群故障分析工具,发现在集群中Shuffle失败的主要原因与网络故障并没有特别大的关系。而是由于Shuffle Service在处理Shuffle验证请求时超时。这个问题日均达到了1000次,某个日子甚至达到大约6000次。

image.png

经过更进一步的研究,发现这是因为Shuffle Service背后使用的Next-gen服务器的问题。在较轻量层的控制层RPC,如身份验证请求并没有很好的与更加耗时的数据处理RPC隔离开来,在集群高峰时段,大量的数据层的RPC占用了Shuffle Service的处理时间,直接导致控制层RPC请求超时。领英修复了这个问题,在集群上部署了改进后的Shuffle Service之后,看到了立杆见影的效果。大大减少了此类问题的出现,提升了集群中Shuffle组件的可靠性。

image.png

Stage2:Shuffle Service端限流机制
在下一阶段,着重于对Shuffle Service端限流来帮助解决集群内abusive应用的影响。这类应用所带来的最大的负面影响是Shuffle Block索取的速度。这些abusive应用会生成大量的小Shuffle Block,因此当Reducer Tasks获取Shuffle Block时,Shuffle Shuffle会开始大量的小数据随机读取操作,很容易导致硬盘带宽过载。在领英生产环境中,abusive应用会延长Shuffle获取等待时间。领英开发了Shuffle Service限流机制,实时追踪每个连接的应用Shuffle Block获取速率,当Shuffle Block获取速率超过阈值时,Shuffle Service可以让相应应用的Reducer Tasks回退,通过减少并发Shuffle Block获取数据流的方式作出响应。

image.png

领英最近在生产环境中使用了Shuffle Service限流机制,观察到集群中所有节点上的Shuffle Service Block获取速率的波动幅度明显降低,这意味着abusive job对Shuffle Service以及相邻应用的影响开始受到了控制。同时,还观察到集群上Shuffle数据传输速率并没有特别明显的变化。这意味着当限制那些少数abusive应用时并不会伤害整个集群Shuffle数据吞吐量。

image.png

Stage3:Magnet
尽管限流可以保护Shuffle Service免受abusive应用的影响,但依然不能根本的解决小Shuffle Block的问题。受到限流的作业可能是高优先级的作业,并且有严格的SLA要求,这些应用无法承受Shuffle Service受导致的运行时间的影响。另一方面,挑战这些应用的参数,以增加Shuffle Block大小也不容易,这会导致任务处理数据量的增加,同时对该应用的其它方面产生影响。为了解决这个问题,领英对Shuffle Service采取了根本的改进,设计并实现了Magnet,一种在Spark之上的全新的push base Shuffle实现方式。相关工作论文也被VLDB2020接收,目前领英也准备将这方面工作贡献至开源社区。更详细的Magnet可以关注后续工程博客文章。

image.png

Magnet采用了推送和合并的Shuffle机制,生成Shuffle文件之后,Map Tasks会将生成的Shuffle Block划分为多个组,每个组包含了连续的Shuffle byte组成的数据,分组之后另外单独的线程会读取一整个组,将其中的Shuffle Block传输到远程的Shuffle Service中。Shuffle Service中Shuffle Block按照不同的Shuffle分区被合并。Shuffle Driver会在Shuffle Map Stage一开始会选择一系列的Shuffle Service,每个Map Tasks都将收到同样的一系列Services,这样可以确保属于同一个分区的Shuffle Block始终被分配到同一个远程的Shuffle Service上。在Shuffle Service端将以Best effort方式把收到Shuffle Block按分区合并至对应的Shuffle文件当中。

image.png

当推送和合并的过程完成时,除了原本并没有被合并的Shuffle Block大小和位置之外,Spark Driver也会收到这些分区合并的Shuffle文件大小和位置。当Reducer Tasks开始运行时,通过Spark Driver查询所需Shuffle Block的位置和大小,这将大大减少Reducer Tasks所需获取的Shuffle Block数量,从而避免小Shuffle Block的问题。尽管Shuffle Service端以Best effort方式把收到Shuffle Block按分区合并,会导致小部分Shuffle Block没有被合并,Reducer Tasks仍然可以获取那些没有被合并的Shuffle Block,从而保证数据的完整性。此外,由于目前Reducer Tasks大部分输入数据都被合并在集群的一个节点之上,Spark Driver在调用Reducer Tasks时考虑到这点,有助于进一步提高Shuffle性能。

image.png

基于Magnet的Shuffle过程中,Map Tasks生成的Shuffle Block会推送的远程Shuffle Service当中,并按照不同的Shuffle分区合并,这个过程有助于将Shuffle当中的小数据随机读取操作转化为大数据的顺序读取操作。此外,Shuffle Block合并的过程,相当于为Shuffle中间数据创造两个副本,有助于进一步提高Shuffle的可靠性。同时Reducer Tasks调取合并后的Shuffle分区的位置,有助于进一步提高Shuffle的性能。

image.png

下图所示,可以观察到基于Magnet的生产性能有了显著的提升。使用Gridbench性能分析工具,对同一个Spark应用,在使用Magnet后的性能进行了分析。下图中使用了较为复杂的生成机器学习特征数据的生产流程,处理了集群中的真实数据。原本在应用执行过程中,占据较大比重的是Shuffle获取等待时间被极大的缩短,带来了接近30%的应用运行时间的缩短。目前,领英正在将这种全新的Shuffle机制推广到生产集群中。

image.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
6月前
|
分布式计算 资源调度 Java
|
6月前
|
SQL 分布式计算 Java
|
6月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
1月前
|
消息中间件 分布式计算 Serverless
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark
53 2
|
3月前
|
SQL 分布式计算 数据处理
Apache Spark简介与历史发展
Apache Spark简介与历史发展
|
4月前
|
SQL 分布式计算 Apache
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
82 0
|
4月前
|
SQL 分布式计算 Apache
流数据湖平台Apache Paimon(五)集成 Spark 引擎
流数据湖平台Apache Paimon(五)集成 Spark 引擎
75 0
|
4月前
|
SQL 分布式计算 数据处理
[AIGC] Apache Spark 简介
[AIGC] Apache Spark 简介

推荐镜像

更多