本文作者 李超 阿里云智能 资深技术专家
编者按
伏羲(Fuxi)是十年前最初创立飞天平台时的三大服务之一(分布式存储 Pangu,分布式计算 MaxCompute,分布式调度 Fuxi),当时的设计初衷是为了解决大规模分布式资源的调度问题(本质上是多目标的最优匹配问题)。
随阿里经济体和阿里云丰富的业务需求(尤其是双十一)和磨练,伏羲的内涵不断扩大,从单一的资源调度器(对标开源系统的YARN)扩展成大数据的核心调度服务,覆盖数据调度(Data Placement)、资源调度(Resouce Management)、计算调度(Application Manager)、和本地微(自治)调度(即正文中的单机调度)等多个领域,并在每一个细分领域致力于打造超越业界主流的差异化能力。
过去十年来,伏羲在技术能力上每年都有一定的进展和突破(如2013年的5K,15年的Sortbenchmark世界冠军,17年的超大规模离在/在离混布能力,2019年的 Yugong 发布并论文被VLDB接受等等)。本文试从面向大数据/云计算的调度挑战出发,介绍各个子领域的关键进展,并回答什么是“伏羲 2.0”。
1. 引言
过去10年,是云计算的10年,伴随云计算的爆炸式增长,大数据行业的工作方式也发生了很大的变化:从传统的自建自运维hadoop集群,变成更多的依赖云上的弹性低成本计算资源。海量大数据客户的信任和托付,对阿里大数据系统来说,是很大的责任,但也催生出了大规模、多场景、低成本、免运维的MaxCompute通用计算系统。
同样的10年,伴随着阿里年年双11,MaxCompute同样支撑了阿里内部大数据的蓬勃发展,从原来的几百台,到现在的10万台物理机规模。
双线需求,殊途同归,海量资源池,如何自动匹配到大量不同需求的异地客户计算需求上,需要调度系统的工作。本文主要介绍阿里大数据的调度系统FUXI往2.0的演进。先给大家介绍几个概念:
- 首先,数据从哪里来?数据往往伴随着在线业务系统产生。而在线系统,出于延迟和容灾的考虑,往往遍布北京、上海、深圳等多个地域,如果是跨国企业,还可能遍布欧美等多个大陆的机房。这也造成了我们的数据天然分散的形态。而计算,也可能发生在任意一个地域和机房。可是网络,是他们中间的瓶颈,跨地域的网络,在延迟和带宽上,远远无法满足大数据计算的需求。如何平衡计算资源、数据存储、跨域网络这几点之间的平衡,需要做好“数据调度”。
- 其次,有了数据,计算还需要CPU,内存,甚至GPU等资源,当不同的公司,或者单个公司内部不同的部门,同时需要计算资源,而计算资源紧张时,如何平衡不同的用户,不同的作业?作业也可能长短不一,重要程度不尽相同,今天和明天的需求也大相径庭。除了用户和作业,计算资源本身可能面临硬件故障,但用户不想受影响。所有这些,都需要“资源调度”。
- 有了数据和计算资源,如何完成用户的计算任务,比如一个SQL query?这需要将一个大任务,分成几个步骤,每个步骤又切分成成千上万个小任务,并行同时计算,才能体现出分布式系统的加速优势。但小任务切粗切细,在不同的机器上有快有慢,上下步骤如何交接数据,同时避开各自故障和长尾,这些都需要“计算调度”。
- 很多不同用户的不同小任务,经过层层调度,最后汇集到同一台物理机上,如何避免单机上真正运行时,对硬件资源使用的各种不公平,避免老实人吃亏。避免重要关键任务受普通任务影响,这都需要内核层面的隔离保障机制。同时还要兼顾隔离性和性能、成本的折中考虑。这都需要“单机调度”。
2013年,伏羲在飞天5K项目中对系统架构进行了第一次大重构,解决了规模、性能、利用率、容错等线上问题,并取得世界排序大赛Sortbenchmark四项冠军,这标志着Fuxi 1.0的成熟。
2019年,伏羲再次出发,从技术上对系统进行了第二次重构,发布Fuxi 2.0版本:阿里自研的新一代高性能、分布式的数据、资源、计算、单机调度系统。Fuxi 2.0进行了全面的技术升级,在全区域数据排布、去中心化调度、在线离线混合部署、动态计算等方面全方位满足新业务场景下的调度需求。
伏羲2.0成果概览
• 业内首创跨地域多数据中心的数据调度方案-Yugong,通过3%的冗余存储,节省80%的跨地域网络带宽
• 业内领先的去中心化资源调度架构,单集群支持10万服务器*10万并发job的高频调度
• 动态DAG闯入传统SQL优化盲区,TPC-DS性能提升27%,conditional join性能提升3X。
• 创新性的数据动态shuffle和全局跨级优化,取代业界磁盘shuffle;线上千万job,整体性能提升20%,成本下降15%,出错率降低一个数量级
• 在线离线规模化混合部署,在线集群利用率由10%提升到40%,双十一大促节省4200台F53资源,且同时保障在线离线业务稳定。
2. 数据调度2.0 - 跨地域的数据调度
阿里巴巴在全球都建有数据中心,每个地区每天会产生一份当地的交易订单信息,存在就近的数据中心。北京的数据中心,每天会运行一个定时任务来统计当天全球所有的订单信息,需要从其他数据中心读取这些交易数据。当数据的产生和消费不在一个数据中心时,我们称之为跨数据中心数据依赖(下文简称跨中心依赖)。
图. 阿里巴巴全球数据中心
MaxCompute上每天运行着数以千万计的作业,处理EB级别的数据。这些计算和数据分布在全球的数据中心,复杂的业务依赖关系产生了大量的跨中心依赖。相比于数据中心内的网络,跨数据中心网络(尤其是跨域的网络)是非常昂贵的,同时具有带宽小、延迟高、稳定性低的特点。比如网络延迟,数据中心内部网络的网络延迟一般在100微秒以下,而跨地域的网络延迟则高达数十毫秒,相差百倍以上。因此,如何高效地将跨中心依赖转化为数据中心内部的数据依赖,减少跨数据中心网络带宽消耗,从而降低成本、提高系统效率,对MaxCompute这样超大规模计算平台而言,具有极其重要的意义。
图. MaxCompute平台数据及依赖增长趋势
为了解决这个问题,我们在数据中心上增加了一层调度层,用于在数据中心之间调度数据和计算。这层调度独立于数据中心内部的调度,目的是实现跨地域维度上存储冗余--计算均衡--长传带宽--性能最优之间的最佳平衡。这层调度层包括跨数据中心数据缓存、业务整体排布、作业粒度调度。
首先是对访问频次高的数据进行跨数据中心缓存,在缓存空间有限的约束下,选择合适的数据进行换入换出。不同于其他缓存系统,MaxCompute的数据(分区)以表的形式组织在一起,每张表每天产生一个或多个分区,作业访问数据也有一些特殊规律,比如一般访问的是连续分区、生成时间越新的分区访问概率越大。
其次是业务的整体排布策略。数据和计算以业务为单位组织在一起(MaxCompute中称之为project),每个project被分配在一个数据中心,包括数据存储和计算作业。如果将project看做一个整体,可以根据作业对数据的依赖关系计算出project之间的相互依赖关系。如果能将有互相数据依赖的project放在一个数据中心,就可以减少跨中心依赖。但project间的依赖往往复杂且不断变化,很难有一劳永逸的排布策略,并且project排布需要对project进行整体迁移,周期较长,且需要消耗大量的带宽。
最后,当project之间的互相依赖集中在极少数几个作业上,并且作业的输入数据量远大于输出数据量时,比起数据缓存和project整体迁移,更好的办法是将这些作业调度到数据所在的数据中心,再将作业的输出远程写回原数据中心,即作业粒度调度。如何在作业运行之前就预测到作业的输入输出数据量和资源消耗,另一方面当作业调度到remote数据中心后,如何保证作业运行不会变慢,不影响用户体验,这都是作业粒度调度要解决的问题。
本质上,数据缓存、业务排布、作业粒度调度三者都在解同一个问题,即在跨地域多数据中心系统中减少跨中心依赖量、优化作业的data locality、减少网络带宽消耗。
1.2.1 跨数据中心数据缓存策略
我们首次提出了跨地域、跨数据中心数据缓存这一概念,通过集群的存储换集群间带宽,在有限的冗余存储下,找到存储和带宽最佳的tradeoff。通过深入的分析MaxCompute的作业、数据的特点,我们设计了一种高效的算法,根据作业历史的workload、数据的大小和分布,自动进行缓存的换入换出。
我们研究了多种数据缓存算法,并对其进行了对比试验,下图展示了不同缓存策略的收益,横轴是冗余存储空间,纵轴是带宽消耗。从图中可以看出,随着冗余存储的增加,带宽成本不断下降,但收益比逐渐降低,我们最终采用的k-probe算法在存储和带宽间实现了很好的平衡。
1.2.2 以project为粒度的多集群业务排布算法
随着上层业务的不断发展,业务的资源需求和数据需求也在不断变化。比如一个集群的跨中心依赖增长迅速,无法完全通过数据缓存来转化为本地读取,这就会造成大量的跨数据中心流量。因此我们需要定期对业务的排布进行分析,根据业务对计算资源、数据资源的需求情况,以及集群、机房的规划,通过业务的迁移来降低跨中心依赖以及均衡各集群压力。
下图展示了某个时刻业务迁移的收益分析:左图横轴为迁移的project数量,纵轴为带宽减少比例,可以看出大约移动60个project就可以减少约30%的带宽消耗。右图统计了不同排布下(迁移0个、20个、50个project)的最优带宽消耗,横轴为冗余存储,纵轴为带宽。
1.2.3 跨数据中心计算调度机制
我们打破了计算资源按照数据中心进行规划的限制,理论上允许作业跑在任何一个数据中心。我们将调度粒度拆解到作业粒度,根据每个作业的数据需求、资源需求,为其找到一个最合适的数据中心。在对作业进行调度之前需要知道这个作业的输入和输出,目前我们有两种方式获得这一信息,对于周期性作业,通过对作业历史运行数据进行分析推测出作业的输入输出;对于偶发的作业,我们发现其产生较大跨域流量时,动态的将其调度到数据所在的数据中心上运行。另外,调度计算还要考虑作业对计算资源的需求,防止作业全部调度到热点数据所在的数据中心,造成任务堆积。
1.3 线上效果
线上三种策略相辅相成,数据缓存主要解决周期类型作业、热数据的依赖;作业粒度调度主要解决临时作业、历史数据的依赖;并周期性地通过业务整体排布进行全局优化,用来降低跨中心依赖。整体来看,通过三种策略的共同作用,降低了约90%的跨地域数据依赖,通过约3%的冗余存储节省了超过80%的跨数据中心带宽消耗,将跨中心依赖转化为本地读取的比例提高至90%。下图以机房为单位展示了带宽的收益:
3. 资源调度2.0 - 去中心化的多调度器架构
2019年双十一,MaxCompute平台产生的数据量已接近EB级别,作业规模达到了千万,有几十亿的worker跑在几百万核的计算单元上,在超大规模(单集群超过万台),高并发的场景下,如何快速地给不同的计算任务分配资源,实现资源的高速流转,需要一个聪明的“大脑”,而这就是集群的资源管理与调度系统(简称资源调度系统)。
资源调度系统负责连接成千上万的计算节点,将数据中心海量的异构资源抽象,并提供给上层的分布式应用,像使用一台电脑一样使用集群资源,它的核心能力包括规模、性能、稳定性、调度效果、多租户间的公平性等等。一个成熟的资源调度系统需要在以下五个方面进行权衡,做到“既要又要”,非常具有挑战性。
13年的5K项目初步证明了伏羲规模化能力,此后资源调度系统不断演进,并通过MaxCompute平台支撑了阿里集团的大数据计算资源需求,在核心调度指标上保持着对开源系统的领先性,比如1)万台规模集群,调度延时控制在了10微秒级别,worker启动延时控制在30毫秒;2)支持任意多级租户的资源动态调节能力(支持十万级别的租户);3)极致稳定,调度服务全年99.99%的可靠性,并做到服务秒级故障恢复。
2.1 单调度器的局限性
2.1.1 线上的规模与压力
大数据计算的场景与需求正在快速增长(下图是过去几年MaxComputer平台计算和数据的增长趋势)。单集群早已突破万台规模,急需提供十万台规模的能力。
图. MaxCompute 2015 ~ 2018线上作业情况
但规模的增长将带来复杂度的极速上升,机器规模扩大一倍,资源请求并发度也会翻一番。在保持既有性能、稳定性、调度效果等核心能力不下降的前提下,可以通过对调度器持续性能优化来扩展集群规模(这也是伏羲资源调度1.0方向),但受限于单机的物理限制,这种优化总会存在天花板,因此需要从架构上优化来彻底规模和性能的可扩展性问题。
2.1.2 调度需求的多样性
伏羲支持了各种各样的大数据计算引擎,除了离线计算(SQL、MR),还包括实时计算、图计算,以及近几年迅速发展面向人工智能领域的机器学习引擎。
图. 资源调度器的架构类型
场景的不同对资源调度的需求也不相同,比如,SQL类型的作业通常体积小、运行时间短,对资源匹配的要求低,但对调度延时要求高,而机器学习的作业一般体积大、运行时间长,调度结果的好坏可能对运行时间产生直接影响,因此也能容忍通过较长的调度延时换取更优的调度结果。资源调度需求这种多样性,决定了单一调度器很难做到“面面俱到”,需要各个场景能定制各自的调度策略,并进行独立优化。
2.1.3 灰度发布与工程效率
资源调度系统是分布式系统中最复杂最重要的的模块之一,需要有严苛的生产发布流程来保证其线上稳定运行。单一的调度器对开发人员要求高,出问题之后影响范围大,测试发布周期长,严重影响了调度策略迭代的效率,在快速改进各种场景调度效果的过程中,这些弊端逐渐显现,因此急需从架构上改进,让资源调度具备线上的灰度能力,从而幅提升工程效率。
2.2 去中心化的多调度器架构
为了解决上述规模和扩展性问题,更好地满足多种场景的调度需求,同时从架构上支持灰度能力,伏羲资源调度2.0在1.0的基础上对调度架构做了大规模的重构,引入了去中心化的多调度器架构。
图. 资源调度的架构类型
我们将系统中最核心的资源管理和资源调度逻辑进行了拆分解耦,使两者同时具备了多partition的可扩展能力(如下图所示),其中:
• 资源调度器(Scheduler):负责核心的机器资源和作业资源需求匹配的调度逻辑,可以横向扩展。
• 资源管理和仲裁服务(ResourceManagerService,简称RMS):负责机器资源和状态管理,对各个Scheduler的调度结果进行仲裁,可以横向扩展。
• 调度协调服务(Coordinator):管理资源调度系统的配置信息,Meta信息,以及对机器资源、Scheduler、RMS的可用性和服务角色间的可见性做仲裁。不可横向扩展,但有秒级多机主备切换能力。
• 调度信息收集监控服务(FuxiEye):统计集群中每台机的运行状态信息,给Scheduler提供调度决策支持,可以横向扩展。
• 用户接口服务(ApiServer):为资源调度系统提供外部调用的总入口,会根据Coordinator提供的Meta信息将用户请求路由到资源调度系统具体的某一个服务上,可以横向扩展。
图. 伏羲多调度器新架构
2.3 上线数据
以下是10w规模集群/10万作业并发场景调度器核心指标(5个Scheduler、5个RMS,单RMS负责2w台机器,单Scheduler并发处理2w个作业)。通过数据可以看到,集群10w台机器的调度利用率超过了99%,关键调度指标,单Scheduler向RMS commit的slot的平均数目达到了1w slot/s。
在保持原有单调度器各项核心指标稳定不变的基础上,去中心化的多调度器框架实现了机器规模和应用并发度的双向扩展,彻底解决了集群的可扩展性问题。
目前资源调度的新架构已全面上线,各项指标持续稳定。在多调度器架构基础上,我们把机器学习场景调度策略进行了分离,通过独立的调度器来进行持续的优化。同时通过测试专用的调度器,我们也让资源调度具备了灰度能力,调度策略的开发和上线周期显著缩短。
4. 计算调度2.0 - 从静态到动态
分布式作业的执行与单机作业的最大区别,在于数据的处理需要拆分到不同的计算节点上,“分而治之”的执行。这个“分”,包括数据的切分,聚合以及对应的不同逻辑运行阶段的区分,也包括在逻辑运行阶段间数据的shuffle传输。每个分布式作业的中心管理点,也就是application master (AM)。这个管理节点也经常被称为DAG (Directional Acyclic Graph, 有向无环图) 组件,是因为其最重要的责任,就是负责协调分布式系统中的作业执行流程,包括计算节点的调度以及数据流(shuffle)。
对于作业的逻辑阶段和各个计算节点的管理, 以及shuffle策略的选择/执行,是一个分布式作业能够正确完成重要前提。这一特点,无论是传统的MR作业,分布式SQL作业,还是分布式的机器学习/深度学习作业,都是一脉相承的,为了帮助更好的理解计算调度(DAG和Shuffle)在大数据平台中的位置,我们可以通过MaxCompute分布式SQL的执行过程做为例子来了解:
在这么一个简单的例子中,用户有一张订单表order_data,存储了海量的交易信息,用户想所有查询花费超过1000的交易订单按照userid聚合后,每个用户的花费之和是多少。于是提交了如下SQL query:
INSERT OVERWRITE TABLE result
SELECT userid, SUM(spend)
FROM order_data
WHERE spend > 1000
GROUP BY userid;
这个SQL经过编译优化之后生成了优化执行计划,提交到fuxi管理的分布式集群中执行。我们可以看到,这个简单的SQL经过编译优化,被转换成一个具有M->R两个逻辑节点的DAG图,也就是传统上经典的MR类型作业。而这个图在提交给fuxi系统后,根据每个逻辑节点需要的并发度,数据传输边上的shuffle方式,调度时间等等信息,就被物化成右边的物理执行图。物理图上的每个节点都代表了一个具体的执行实例,实例中包含了具体处理数据的算子,特别的作为一个典型的分布式作业,其中包含了数据交换的算子shuffle——负责依赖外部存储和网络交换节点间的数据。一个完整的计算调度,包含了上图中的DAG的调度执行以及数据shuffle的过程。
阿里计算平台的fuxi计算调度,经过十年的发展和不断迭代,成为了作为阿里集团内部以及阿里云上大数据计算的重要基础设施。今天计算调度同时服务了以MaxCompute SQL和PAI为代表的多种计算引擎,在近10万台机器上日均运行着千万界别的分布式DAG作业,每天处理EB数量级的数据。一方面随着业务规模和需要处理的数据量的爆发,这个系统需要服务的分布式作业规模也在不断增长;另一方面,业务逻辑以及数据来源的多样性,计算调度在阿里已经很早就跨越了不同规模上的可用/够用的前中期阶段,2.0上我们开始探索更加前沿的智能化执行阶段。
在云上和阿里集团的大数据实践中,我们发现对于计算调度需要同时具备超大规模和智能化的需求,以此为基本诉求我们开了Fuxi计算调度2.0的研发。下面就为大家从DAG调度和数据shuffle两个方面分别介绍计算调度2.0的工作。
4.1 Fuxi DAG 2.0--动态、灵活的分布式计算生态
4.1.1 DAG调度的挑战
传统的分布式作业DAG,一般是在作业提交前静态指定的,这种指定方式,使得作业的运行没有太多动态调整的空间。放在DAG的逻辑图与物理图的背景中来说,这要求分布式系统在运行作业前,必须事先了解作业逻辑和处理数据各种特性,并能够准确回答作业运行过程,各个节点和连接边的物理特性问题,然而在现实情况中,许多和运行过程中数据特性相关的问题,都只有个在执行过程中才能被最准确的获得。静态的DAG执行,可能导致选中的是非最优的执行计划,从而导致各种运行时的效率低下,甚至作业失败。这里我们可以用一个分布式SQL中很常见的例子来说明:
SELECT a.spend, a.userid, b.age
FROM (
SELECT spend, userid
FROM order_data
WHERE spend > 1000
) a
JOIN (
SELECT userid, age
FROM user
WHERE age > 60
) b
ON a.userid = b.userid;
上面是一个简单的join的例子,目的是获取60岁以上用户花费大于1000的详细信息,由于年纪和花费在两张表中,所以此时需要做一次join。一般来说join有两种实现方式:
一是Sorted Merge Join(如下图左侧的所示):也就是对于a和b两个子句执行后的数据按照join key(userid)进行分区,然后在下游节点按照相同的key进行Merge Join操作,实现Merge Join需要对两张表都要做shuffle操作——也就是进行一次数据狡猾,特别的如果有数据倾斜(例如某个userid对应的交易记录特别多),这时候MergeJoin过程就会出现长尾,影响执行效率;
二是实现方式是Map join(Hash join)的方式(如下图右侧所示):上述sql中如果60岁以上的用户信息较少,数据可以放到一个计算节点的内存中,那对于这个超小表可以不做shuffle,而是直接将其全量数据broadcast到每个处理大表的分布式计算节点上,大表不用进行shuffle操作,通过在内存中直接建立hash表,完成join操作,由此可见map join优化能大量减少 (大表) shuffle同时避免数据倾斜,能够提升作业性能。但是如果选择了map join的优化,执行过程中发现小表数据量超过了内存限制(大于60岁的用户很多),这个时候query执行就会由于oom而失败,只能重新执行。
但是在实际执行过程中,具体数据量的大小,需要在上游节点完成后才能被感知,因此在提交作业前很难准确的判断是否可以采用Map join优化,从上图可以看出在Map Join和Sorted Merge Join上DAG图是两种结构,因此这需要DAG调度在执行过程中具有足够的动态性,能够动态的修改DAG图来达到执行效率的最优。我们在阿里集团和云上海量业务的实践中发现,类似map join优化的这样的例子是很普遍的,从这些例子可以看出,随着大数据平台优化的深入进行,对于DAG系统的动态性要求越来越高。
由于业界大部分DAG调度框架都在逻辑图和物理图之间没有清晰的分层,缺少执行过程中的动态性,无法满足多种计算模式的需求。例如spark社区很早提出了运行时调整Join策略的需求(Join: Determine the join strategy (broadcast join or shuffle join) at runtime),但是目前仍然没有解决。
除此上述用户体感明显的场景之外,随着MaxCompute计算引擎本身更新换代和优化器能力的增强,以及PAI平台的新功能演进,上层的计算引擎自身能力在不断的增强。对于DAG组件在作业管理,DAG执行等方面的动态性,灵活性等方面的需求也日益强烈。在这样的一个大的背景下,为了支撑计算平台下个10年的发展,伏羲团队启动了DAG 2.0的项目,在更好的支撑上层计算需求。
4.1.2 DAG2.0 动态灵活统一的执行框架
DAG2.0通过逻辑图和物理图的清晰分层,可扩展的状态机管理,插件式的系统管理,以及基于事件驱动的调度策略等基座设计,实现了对计算平台上多种计算模式的统一管理,并更好的提供了作业执行过程中在不同层面上的动态调整能力。作业执行的动态性和统一DAG执行框架是DAG2.0的两个主要特色:
作业执行的动态性
如前所诉,分布式作业执行的许多物理特性相关的问题,在作业运行前是无法被感知的。例如一个分布式作业在运行前,能够获得的只有原始输入的一些基本特性(数据量等), 对于一个较深的DAG执行而言,这也就意味着只有根节点的物理计划(并发度选择等) 可能相对合理,而下游的节点和边的物理特性只能通过一些特定的规则来猜测。这就带来了执行过程中的不确定性,因此,要求一个好的分布式作业执行系统,需要能够根据中间运行结果的特点,来进行执行过程中的动态调整。
而DAG/AM作为分布式作业唯一的中心节点和调度管控节点,是唯一有能力收集并聚合相关数据信息,并基于这些数据特性来做作业执行的动态调整。这包括简单的物理执行图调整(比如动态的并发度调整),也包括复杂一点的调整比如对shuffle方式和数据编排方式重组。除此以外,数据的不同特点也会带来逻辑执行图调整的需求:对于逻辑图的动态调整,在分布式作业处理中是一个全新的方向,也是我们在DAG 2.0里面探索的新式解决方案。
还是以map join优化作为例子,由于map join与默认join方式(sorted merge join)对应的其实是两种不同优化器执行计划,在DAG层面,对应的是两种不同的逻辑图。DAG2.0的动态逻辑图能力很好的支持了这种运行过程中根据中间数据特性的动态优化,而通过与上层引擎优化器的深度合作,在2.0上实现了业界首创的conditional join方案。如同下图展示,在对于join使用的算法无法被事先确定的时候,分布式调度执行框架可以允许优化提交一个conditional DAG,这样的DAG同时包括使用两种不同join的方式对应的不同执行计划支路。在实际执行时,AM根据上游产出数据量,动态选择一条支路执行(plan A or plan B)。这样子的动态逻辑图执行流程,能够保证每次作业运行时,根据实际产生的中间数据特性,选择最优的执行计划。在这个例子中,
- 当M1输出的数据量较小时,允许其输出被全量载入下游单个计算节点的内存,DAG就会选择优化的map join(plan A),来避免额外的shuffle和排序。
- 当M1输出的数据量大到一定程度,已经不属于map join的适用范围,DAG就可以自动选择走merge join,来保证作业的成功执行。
除了map join这个典型场景外,借助DAG2.0的动态调度能力,MaxCompute在解决其他用户痛点上也做了很多探索,并取得了不错的效果。例如智能动态并发度调整:在执行过程中依据分区数据统计调整,动态调整并发度;自动合并小分区,避免不必要的资源使用,节约用户资源使用;切分大分区,避免不必要的长尾出现等等。
统一的AM/DAG执行框架
除了动态性在SQL执行中带来的重大性能提升外,DAG 2.0抽象分层的点,边,图架构上,也使其能通过对点和边上不同物理特性的描述,对接不同的计算模式。业界各种分布式数据处理引擎,包括SPARK, FLINK, HIVE, SCOPE, TENSORFLOW等等,其分布式执行框架的本源都可以归结于Dryad提出的DAG模型。我们认为对于图的抽象分层描述,将允许在同一个DAG系统中,对于离线/实时/流/渐进计算等多种模型都可以有一个好的描述。
如果我们对分布式SQL进行细分的话,可以看见业界对于不同场景上的优化经常走在两个极端:要么优化throughput (大规模,相对高延时),要么优化latency(中小数据量,迅速完成)。前者以Hive为典型代表,后者则以Spark以及各种分布式MPP解决方案为代表。而在阿里分布式系统的发展过程中,历史上同样出现了两种对比较为显著的执行方式:SQL线离线(batch)作业与准实时(interactive)作业。这两种模式的资源管理和作业执行,过去是搭建在两套完全分开的代码实现上的。这除了导致两套代码和功能无法复用以外,两种计算模式的非黑即白,使得彼此在资源利用率和执行性能之间无法tradeoff。而在DAG 2.0模型上,通过对点/边物理特性的映射,实现了这两种计算模式比较自然的融合和统一。离线作业和准实时作业在逻辑节点和逻辑边上映射不同的物理特性后,都能得到准确的描述:
- 离线作业:每个节点按需去申请资源,一个逻辑节点代表一个调度单位;节点间连接边上传输的数据,通过落盘的方式来保证可靠性;
- 准实时作业:整个作业的所有节点都统一在一个调度单位内进行gang scheduling;节点间连接边上通过网络/内存直连传输数据,并利用数据pipeline来追求最优的性能。
在此统一离线作业与准实时作业的到一套架构的基础上,这种统一的描述方式,使得探索离线作业高资源利用率,以及准实时作业的高性能之间的tradeoff成为可能:当调度单位可以自由调整,就可以实现一种全新的混合的计算模式,我们称之为Bubble执行模式。
这种混合Bubble模式,使得DAG的用户,也就是上层计算引擎的开发者(比如MaxCompute的优化器),能够结合执行计划的特点,以及引擎终端用户对资源使用和性能的敏感度,来灵活选择在执行计划中切出Bubble子图。在Bubble内部充分利用网络直连和计算节点预热等方式提升性能,没有切入Bubble的节点则依然通过传统离线作业模式运行。在统一的新模型之上,计算引擎和执行框架可以在两个极端之间,根据具体需要,选择不同的平衡点。
4.1.3 效果
DAG2.0的动态性使得很多执行优化可以运行时决定,使得实际执行的效果更优。例如,在阿里内部的作业中,动态的conditional join相比静态的执行计划,整体获得了将近3X的性能提升。
混合Bubble执行模式平衡了离线作业高资源利用率以及准实时作业的高性能,这在1TB TPCH测试集上有显著的体现,
- Bubble相对离线作业:在多使用20%资源的情况下,Bubble模式性能提升将近一倍;
- Bubble相对准实时模式:在节省了2.6X资源情况下, Bubble性能仅下降15%;
4.2 Fuxi Shuffle 2.0 - 磁盘内存网络的最佳使用
4.2.1 背景
大数据计算作业中,节点间的数据传递称为shuffle, 主流分布式计算系统都提供了数据shuffle服务的子系统。如前述DAG计算模型中,task间的上下游数据传输就是典型的shuffle过程。
在数据密集型作业中,shuffle阶段的时间和资源使用占比非常高,有其他大数据公司研究显示,在大数据计算平台上Shuffle阶段均是在所有作业的资源使用中占比超过50%. 根据统计在MaxCompute生产中shuffle占作业运行时间和资源消耗的30-70%,因此优化shuffle流程不但可以提升作业执行效率,而且可以整体上降低资源使用,节约成本,提升MaxCompute在云计算市场的竞争优势。
从shuffle介质来看,最广泛使用的shuffle方式是基于磁盘文件的shuffle. 这种模式这种方式简单,直接,通常只依赖于底层的分布式文件系统,适用于所有类型作业。而在典型的常驻内存的实时/准实时计算中,通常使用网络直连shuffle的方式追求极致性能。Fuxi Shuffle在1.0版本中将这两种shuffle模式进行了极致优化,保障了日常和高峰时期作业的高效稳定运行。
挑战
我们先以使用最广泛的,基于磁盘文件系统的离线作业shuffle为例。
通常每个mapper生成一个磁盘文件,包含了这个mapper写给下游所有reducer的数据。而一个reducer要从所有mapper所写的文件中,读取到属于自己的那一小块。右侧则是一个系统中典型规模的MR作业,当每个mapper处理256MB数据,而下游reducer有10000个时,平均每个reducer读取来自每个mapper的数据量就是25.6KB, 在机械硬盘HDD为介质的存储系统中,属于典型的读碎片现象,因为假设我们的磁盘iops能达到1000, 对应的throughput也只有25MB/s, 严重影响性能和磁盘压力。
【基于文件系统shuffle的示意图 / 一个20000*10000的MR作业的碎片读】
分布式作业中并发度的提升往往是加速作业运行的最重要手段之一。但处理同样的数据量,并发度越高意味着上述碎片读现象越严重。通常情况下选择忍受一定的碎片IO现象而在集群规模允许的情况下提升并发度,还是更有利于作业的性能。所以碎片IO现象在线上普遍存在,磁盘也处于较高的压力水位。
一个线上的例子是,某些主流集群单次读请求size为50-100KB, Disk util指标长期维持在90%的警戒线上。这些限制了对作业规模的进一步追求。
我们不禁考虑,作业并发度和磁盘效率真的不能兼得吗?
4.2.2 Fuxi的答案:Fuxi Shuffle 2.0
引入Shuffle Service - 高效管理shuffle资源
为了针对性地解决上述碎片读问题及其引发的一连串负面效应,我们全新打造了基于shuffle service的shuffle模式。Shuffle service的最基本工作方式是,在集群每台机器部署一个shuffle
agent节点,用来归集写给同一reducer的shuffle数据。如下图
可以看到,mapper生成shuffle数据的过程变为mapper将shuffle数据通过网络传输给每个reducer对应的shuffle agent, 而shuffle agent归集一个reducer来自所有mapper的数据,并追加到shuffle磁盘文件中,两个过程是流水线并行化起来的。
Shuffle agent的归集功能将reducer的input数据从碎片变为了连续数据文件,对HDD介质相当友好。由此,整个shuffle过程中对磁盘的读写均为连续访问。从标准的TPCH等测试中可以看到不同场景下性能可取得百分之几十到几倍的提升,且大幅降低磁盘压力、提升CPU等资源利用率。
Shuffle Service的容错机制
Shuffle service的归集思想在公司内外都有不同的工作展现类似的思想,但都限于“跑分”和小范围使用。因为这种模式对于各环节的错误天生处理困难。
以shuffle agent文件丢失/损坏是大数据作业的常见问题为例,传统的文件系统shuffle可以直接定位到出错的数据文件来自哪个mapper,只要重跑这个mapper即可恢复。但在前述shuffle service流程中,由于shuffle agent输出的shuffle这个文件包含了来自所有mapper的shuffle数据,损坏文件的重新生成需要以重跑所有mapper为代价。如果这种机制应用于所有线上作业,显然是不可接受的。
我们设计了数据双副本机制解决了这个问题,使得大多数通常情况下reducer可以读取到高效的agent生成的数据,而当少数agent数据丢失的情况,可以读取备份数据,备份数据的重新生成只依赖特定的上游mapper.
具体来说,mapper产生的每份shuffle数据除了发送给对于shuffle agent外,也会按照与传统文件系统shuffle数据类似的格式,在本地写一个备份。按前面所述,这份数据写的代价较小但读取的性能不佳,但由于仅在shuffle agent那个副本出错时才会读到备份数据,所以对作业整体性能影响很小,也不会引起集群级别的磁盘压力升高。
有效的容错机制使得shuffle service相对于文件系统shuffle,在提供更好的作业性能的同时,因shuffle数据出错的task重试比例降低了一个数量级,给线上全面投入使用打好了稳定性基础。
线上生产环境的极致性能稳定性
在前述基础功能之上,Fuxi线上的shuffle系统应用了更多功能和优化,在性能、成本、稳定性等方便取得了进一步的提升。举例如下。
1. 流控和负载均衡
前面的数据归集模型中,shuffle agent作为新角色衔接了mapper的数据发送与数据落盘。分布式集群中磁盘、网络等问题可能影响这条链路上的数据传输,节点本身的压力也可能影响shuffle agent的工作状态。当因集群热点等原因使得shuffle agent负载过重时,我们提供了必要的流控措施缓解网络和磁盘的压力;和模型中一个reducer有一个shuffle agent收集数据不同,我们使用了多个shuffle agent承担同样的工作,当发生数据倾斜时,这个方式可以有效地将压力分散到多个节点上。从线上表现看,这些措施消除了绝大多数的shuffle期间拥塞流控和集群负载不均现象。
2. 故障shuffle
agent的切换
各种软硬件故障导致shuffle agent对某个reducer的数据工作不正常时,后续数据可以实时切换到其他正常shuffle agent. 这样,就会有更多的数据可以从shuffle agent侧读到,而减少低效的备份副本访问。
3. Shuffle agent数据的回追
很多时候发生shuffle
agent切换时(如机器下线),原shuffle agent生成的数据可能已经丢失或访问不到。在后续数据发送到新的shuffle agent同时,Fuxi还会将丢失的部分数据从备份副本中load起来并同样发送给新的shuffle agent, 使得后续reducer所有的数据都可以读取自shuffle agent侧,极大地提升了容错情况下的作业性能。
4. 新shuffle模式的探索
前述数据归集模型及全面扩展优化,在线上集群中单位资源处理的数据量提升了约20%, 而因出错重试的发生频率降至原来文件系统shuffle的5%左右。但这就是最高效的shuffle方式了吗?
我们在生产环境对部分作业应用了一种新的shuffle模型,这种模型中mapper的发送端和reducer的接收端都通过一个agent节点来中转shuffle流量。线上已经有部分作业使用此种方式并在性能上得到了进一步的提升。
内存数据shuffle
离线大数据作业可能承担了主要的计算数据量,但流行的大数据计算系统中有非常多的场景是通过实时/准实时方式运行的,作业全程的数据流动发生在网络和内存,从而在有限的作业规模下取得极致的运行性能,如大家熟悉的Spark, Flink等系统。
Fuxi DAG也提供了实时/准实时作业运行环境,传统的shuffle方式是通过网络直连,也能收到明显优于离线shuffle的性能。这种方式下,要求作业中所有节点都要调度起来才能开始运行,限制了作业的规模。而实际上多数场景计算逻辑生成shuffle数据的速度不足以填满shuffle带宽,运行中的计算节点等待数据的现象明显,性能提升付出了资源浪费的代价。
我们将shuffle service应用到内存存储中,以替换network传输的shuffle方式。一方面,这种模式解耦了上下游调度,整个作业不再需要全部节点同时拉起;另一方面通过精确预测数据的读写速度并适时调度下游节点,可以取得与network传输shuffle相当的作业性能,而资源消耗降低50%以上。这种shuffle方式还使得DAG系统中多种运行时调整DAG的能力可以应用到实时/准实时作业中。
4.2.3 收益
Fuxi Shuffle 2.0全面上线生产集群,处理同样数据量的作业资源比原来节省15%,仅shuffle方式的变化就使得磁盘压力降低23%,作业运行中发生错误重试的比例降至原来的5%。
【线上典型集群的性能与稳定性提升示意图(不同组数据表示不同集群)】
对使用内存shuffle的准实时作业,我们在TPCH等标准测试集中与网络shuffle性能相当,资源使用只有原来的30%左右,且支持了更大的作业规模,和DAG 2.0系统更多的动态调度功能应用至准实时作业。
5. 单机调度
大量分布式作业汇集到一台机器上,如何将单机有限的各种资源合理分配给每个作业使用,从而达到作业运行质量、资源利用率、作业稳定性的多重保障,是单机调度要解决的任务。
典型的互联网公司业务一般区分为离线业务与在线业务两种类型。在阿里巴巴,我们也同样有在线业务如淘宝、天猫、钉钉、Blink等,这类业务的特点是对响应延迟特别敏感,一旦服务抖动将会出现添加购物车失败、下单失败、浏览卡顿、钉钉消息发送失败等各种异常情况,严重影响用户体验,同时为了应对在618、双11等各种大促的情况,需要提前准备大量的机器。由于以上种种原因,日常状态这些机器的资源利用率不足10%,产生资源浪费的情况。与此同时,阿里的离线业务又是另外一幅风景,MaxCompute计算平台承担了阿里所有大数据离线计算业务类型,各个集群资源利用率常态超负载运行,数据量和计算量每年都在保持高速增长。
一方面是在线业务资源利用率不足,另一方面是离线计算长期超负载运行,那么能否将在线业务与离线计算进行混合部署,提升资源利用率同时大幅降低成本,实现共赢。
5.1 三大挑战
- 如何保障在线服务质量
在线集群的平均CPU利用率只有10%左右,混部的目标就是将剩余的资源提供给MaxCompute进行离线计算使用,从而达到节约成本的目的。那么,如何能够保障资源利用率提升的同时又能够保护在线服务不受影响呢? - 如何保障离线稳定
当资源发生冲突时,第一反应往往是保护在线,牺牲离线。毕竟登不上淘宝天猫下不了单可是大故障。可是,离线如果无限制的牺牲下去,服务质量将会出现大幅度下降。试想,我在dataworks上跑个SQL,之前一分钟就出结果,现在十几分钟甚至一个小时都跑不出来,大数据分析的同学估计也受不了了。 - 如何衡量资源质量
电商业务通过富容器的方式集成多种容器粒度的分析手段,但是前文描述过离线作业的特点,如何能够精准的对离线作业资源使用进行资源画像分析,如果能够评估资源受干扰的程度,混部集群的稳定性等问题,是对我们的又一个必须要解决的挑战
5.2 资源隔离分级管理
单机的物理资源总是有限的,按照资源特性可以大体划分为可伸缩资源与不可伸缩资源两大类。CPU、Net、IO等属于可伸缩资源,Memory属于不可伸缩资源,不同类型的资源有不同层次的资源隔离方案。另一方面,通用集群中作业类型种类繁多,不同作业类型对资源的诉求是不同的。这里包括在线、离线两个大类的资源诉求,同时也包含了各自内部不同层次的优先级二次划分需求,十分复杂。
基于此,Fuxi2.0提出了一套基于资源优先级的资源划分逻辑,在资源利用率、多层次资源保障复杂需求寻找到了解决方案。
下面我们将针对CPU分级管理进行深入描述,其他维度资源管理策略我们将在今后的文章中进行深入介绍。
CPU分级管理
通过精细的组合多种内核策略,将CPU区分为高、中、低三类优先级
隔离策略如下图所示
基于不同类型的资源对应不同的优先级作业
5.3 资源画像
Fuxi作为资源调度模块,对资源使用情况的精准画像是衡量资源分配,调查/分析/解决解决资源问题的关键。针对在线作业的资源情况,集团和业界都有较多的解决方案。这类通用的资源采集角色存在以下无法解决的问题无法应用于离线作业资源画像的数据采集阶段
1. 采集时间精度过低。大部分信息是分钟级别,而MaxCompute作业大部分运行时间在秒级。
2. 无法定位MaxCompute信息。MaxCompute是基于Cgroup资源隔离,因此以上工具无法针对作业进行针对性采集
3. 采集指标不足。有大量新内核新增的微观指标需要进行收集,过去是不支持的
为此,我们提出了FuxiSensor的资源画像方案,架构如上图所示,同时利用SLS进行数据的收集和分析。在集群、Job作业、机器、worker等不同层次和粒度实现了资源信息的画像,实现了秒级的数据采集精度。在混部及MaxCompute的实践中,成为资源问题监控、报警、稳定性数据分析、作业异常诊断、资源监控状况的统一入口,成为混部成功的关键指标。
5.4 线上效果
日常资源利用率由10%提升到40%以上
在线抖动小于5%
5.5 单机调度小结
为了解决三大挑战,通过完善的各维度优先级隔离策略,将在线提升到高优先级资源维度,我们保障了在线的服务质量稳定;通过离线内部优先级区分及各种管理策略,实现了离线质量的稳定性保障;通过细粒度资源画像信息,实现了资源使用的评估与分析,最终实现了混部在阿里的大规模推广与应用,从而大量提升了集群资源利用率,为离线计算节省了大量成本。
6. 展望
从2009到2019年历经十年的锤炼,伏羲系统仍然在不断的演化,满足不断涌现的业务新需求,引领分布式调度技术的发展。接下来,我们会从以下几个方面继续创新:
- 资源调度FuxiMaster将基于机器学习,实现智能化调度策略和动态精细的资源管理模式,进一步提高集群资源利用率,提供更强大灵活的分布式集群资源管理服务。
- 新一代DAG2.0继续利用动态性精耕细作,优化各种不同类型的作业;与SQL深入合作,解决线上痛点,推动SQL引擎深度优化,提升性能的同时也让SQL作业运行更加智能化;探索机器学习场景的DAG调度,改善训练作业的效率,提升GPU使用率。
- 数据Shuffle2.0则一方面优化shuffle流程,追求性能、成本、稳定性的极致,另一方面与DAG 2.0深入结合,提升更多场景;同时探索新的软硬件架构带来的新的想象空间。
- 智能化的精细单机资源管控,基于资源画像信息通过对历史数据分析产生未来趋势预测,通过多种资源管控手段进行精准的资源控制,实现资源利用率和不同层次服务质量的完美均衡。
最后,我们热忱欢迎集团各个团队一起交流探讨,共同打造世界一流的分布式调度系统!
MaxCompute产品官网 https://www.aliyun.com/product/odps
更多阿里巴巴大数据计算技术交流,欢迎扫码加入“MaxCompute开发者社区”钉钉群。