应用层
应用层包括三种类型的进程:
- 驱动进程(Driver):用来执行用户程序。
- 工作进程(Worker):用来执行 Driver 或者其他 Worker 指派的任务(remote functions,就是用户代码中装饰了`@ray.remote` 的那些函数)的无状态进程。工作进程在节点启动时被自动启动,一般来说会在每个物理机上启动与 CPU 同样数量的 Worker(这里还有些问题:如果节点是容器的话,获取的仍然是其所在物理机的 CPU 数)。当一个远程函数被声明时,会被注册到全局,并推送到所有 Worker。每个 Worker 顺序的执行任务,并且不维护本地状态。
- 角色进程(Actor):用来执行角色方法的有状态进程。与 Worker 被自动的启动不同,每个 Actor 会根据需求(即被调用时)被工作进程或者驱动进程显示启动。和 Worker 一样,Actor 也会顺序的执行任务,不同的是,下一个任务的执行依赖于前一个任务生成或改变的状态(即 Actor 的成员变量)。
系统层
系统层包括三个主要组件:全局控制存储(GCS,global control store),分布式调度器(distributed scheduler)和分布式对象存储(distributed object store)。所有组件都可以进行水平扩展并且支持容错。
全局控制存储(GCS)
全局状态存储维护着系统全局的控制状态信息,是我们系统独创的一个部件。其核心是一个可以进行发布订阅的键值对存储。我们通过分片(sharding)来应对扩展,每片存储通过链式副本(将所有数据副本组织成链表,来保证强一致性,见04年的一篇论文)来提供容错。提出和设计这样一个GCS的动机在于使系统能够每秒进行数百万次的任务创建和调度,并且延迟较低,容错方便。
对于节点故障的容错需要一个能够记录谱系信息(lineage information)的方案。现有的基于谱系的解决方法侧重粗粒度(比如 Spark 的 rdd)的并行,因此可以只利用单个节点(如Master or Driver)存储谱系信息,而不影响性能。然而,这种设计并不适合像仿真(simulation)一样的细粒度、动态的作业类型(workload)。因此我们将谱系信息的存储与系统其它模块解耦,使之可以独立地动态扩容。
保持低延迟意味着要尽可能降低任务调度的开销。具体来说,一个调度过程包括选择节点,分派任务,拉取远端依赖对象等等。很多现有的信息流系统,将其所有对象的位置、大小等信息集中存储在调度器上,使得上述调度过程耦合在一块。当调度器不是瓶颈的时候,这是一个很简单自然的设计。然而,考虑到 Ray 要处理的数据量级和数据粒度,需要将中心调度器从关键路径中移出(否则如果所有调度都要全局调度器经手处理,它肯定会成为瓶颈)。对于像 allreduce 这样的(传输频繁,对延迟敏感)分布式训练很重要的原语来说,每个对象传输时都经手调度器的开销是不可容忍的。因此,我们将对象的元数据存储在 GCS 中而不是中央调度器里,从而将任务分派与任务调度完全解耦。
总的来说,GCS 极大地简化了 Ray 的整体设计,因为它将所有状态揽下,从而使得系统中其他部分都变成无状态。这不仅使得对容错支持简化了很多(即,每个故障节点恢复时只需要从 GCS 中读取谱系信息就行),也使得分布式的对象存储和调度器可以进行独立的扩展(因为所有组件可以通过 GCS 来获取必要的信息)。还有一个额外的好处,就是可以更方便的开发调试、监控和可视化工具。
自下而上的分布式调度系统(Bottom-up Distributed Scheduler)
如前面提到的,Ray 需要支持每秒数百万次任务调度,这些任务可能只持续短短数毫秒。大部分已知的调度策略都不满足这些需求。常见的集群计算框架,如 Spark, CIEL, Dryad 都实现了一个中心的调度器。这些调度器具有很好的局部性(局部性原理)的特点,但是往往会有数十毫秒的延迟。像 work stealing,Sparrow 和 Canary 一样的的分布式调度器的确能做到高并发,但是往往不考虑数据的局部性特点,或者假设任务(tasks)属于不同的作业(job),或者假设计算拓扑是提前知道的。
为了满足上述需求,我们设计了一个两层调度架构,包括一个全局调度器(global scheduler)和每个节点上的本地调度器(local scheduler)。为了避免全局调度器过载,每个节点(node)上创建的任务会被先提交到本地调度器。本地调度器总是先尝试将任务在本地执行,除非其所在机器过载(比如任务队列超过了预定义的阈值)或者不能满足任务任务的资源需求(比如,缺少 GPU)。如果本地调度器发现不能在本地执行某个任务,会将其转发给全局调度器。由于调度系统都倾向于首先在本地调度任务(即在调度结构层级中的叶子节点),我们将其称为自下而上的调度系统(可以看出,本地调度器只是根据本节点的局部负载信息进行调度,而全局调度器会根据全局负载来分派任务;当然前提是资源约束首先得被满足)。
图6 这是调度系统示意图,任务自下而上被提交:任务首先被驱动进程(Drivers)或者工作进程(Workers)提交到本地调度器,然后在需要的时候会由本地调度器转给全局调度器进行处理。图中箭头的粗细程度代表其请求的繁忙程度。
全局调度器根据每个节点的负载状况和资源请求约束来决定调度策略。细化一下就是,全局调度器首先确定所有满足任务资源要求的节点,然后在其中选择具有最小预估排队时间(estimated waiting time)的一个,将任务调度过去。在给定的节点上,预估排队时间是下述两项时间的和:1)任务在节点上的排队时间 (任务队列长度乘上平均执行时间);2)任务依赖的远程对象的预估传输时间(所有远程输入的大小除以平均带宽)。全局调度器通过心跳获取到每个节点的任务排队情况和可用资源信息,从 GCS 中得到任务所有输入的位置和大小。然后,全局调度器通过移动指数平均(exponential averaging)的方法来计算任务平均执行时间和平均传输带宽。如果全局调度器成为了系统瓶颈,我们可以实例化更多的副本来分摊流量,它们通过 GCS来共享全局状态信息。如此一来,我们的调度架构具有极高可扩展性。
任务生命周期
(注:这部分是从代码中的设计文档翻译而来,注意这只是截止到2019.04.21 的设计)
在实现的时候,每个任务具有以下几种状态。任意时刻,任务都会处在这几种状态之一。
- 可放置(Placeable):任务已经准备好被调度到(本地或者远程)节点上,具体如何调度,前一段已经说明。注意该状态不表示放置位置已经最终确定,还可能被再一次被从某处调度出去。
- 等待角色创建(WaitActorCreation):一个角色方法(task)等待其所在角色实例化完毕。一旦角色被创建,该任务会被转给运行该角色的远端机器进行处理。
- 等待中(Waiting):等待该任务参数需求被满足,即,等待所有远端参数对象传送到本地对象存储中。
- 准备好(Ready):任务准备好了被运行,也就说所有所需参数已经在本地对象存储中就位了。
- 运行中(Running):任务已经被分派,并且正在本地工作进程(worker)或者角色进程(actor)中运行。
- 被阻塞(Blocked):当前任务由于其依赖的数据不可用而被阻塞住。如,嵌套调用时,该任务启动了另外的远程任务并且等待其完成,以取得结果。
- 不可行(infeasible):任务的资源要求在任何一台机器上都得不到满足。
--------------------------------- | | | forward | forward |---------------- | node with ------| | arguments | resources forward| | resource | local | actor/worker joins | v available | --------> | available ---------------------- Placeable ----------> Waiting Ready ---------> Running | | | ^ ^ <-------- ^ | ^ | |--------- | | | local arg | | | | | | | | evicted | worker | | worker | | actor | | | | blocked | | unblocked | resources | created | | actor | --------------- | | | infeasible | | | created | actor | | | | | | (remote) | created v | | | v | | (local) Blocked | | WaitForActorCreation---------- | v ----Infeasible
基于内存的分布式对象存储
为了降低任务的延迟,我们实现了一个基于内存的分布式存储系统以存储每个任务(无状态的计算过程)的输入和输出。在每个节点上,我们以共享内存(shared memory)的方式实现了对象存储。这使得同一节点上的不同任务以零拷贝的代价进行数据共享。至于数据格式,我们选择了 Apache Arrow。
如果一个任务的输入(即函数的参数对象)不在本地,在该任务执行之前,输入会被拷贝到本地的对象存储中。同时,任务执行完毕后,会将输出也写到本地得对象存储中。对象拷贝消除了热数据所造成的潜在的瓶颈,并且通过将任务的数据读写都限制在本地内存中以缩短执行时间。这些做法增加了计算密集型工作任务的吞吐量,而很多 AI 应用都是计算密集型的。为了降低延迟,我们将用到的对象全部放在内存中,只有在内存不够的时候才通过 LRU 算法将一些对象挤出内存(从API 可以看出,每个节点的内存上限可以在启动节点时通过参数指定。此外用 LRU 作为垃圾回收算法还是有点粗暴,如果不同类型的任务负载跑在同一个 ray 集群上,可能导致资源的互相争抢,从而有大量的资源换出然后重建,从而严重影响效率)。
和现有的计算框架的集群(如Spark, Dryad)一样,对象存储只接受不可变数据(immutable data)。这种设计避免了对复杂的一致性协议的需求(因为对象数据从来不需要进行更新),并且简化了数据的容错支持。当有节点出现故障时,Ray 通过重新执行对象谱系图来恢复任意所需对象(也就是说不用整个恢复该宕机节点所有状态,只需要按需恢复后面计算所需数据,用不到的数据丢了就丢了吧)。在工作开始之前,存放在 GCS 的谱系信息追踪了所有无状态的任务和有状态的角色;我们利用前者对丢失对象进行重建(结合上一段,如果一个任务有大量的迭代,并且都是远程执行,会造成大量的中间结果对象,将内存挤爆,从而使得较少使用但是稍后可能使用的全局变量挤出内存,所以 LRU 有点粗暴,听说现在在酝酿基于引用计数的GC)。
为了简化实现,我们的对象存储不支持分布式的对象。也就是说,每个对象必须能够在单节点内存下,并且只存在于单节点中。对于大矩阵、树状结构等大对象,可以在应用层来拆分处理,比如说实现为一个集合。
实现
Ray 是一个由加州大学伯克利分校开发的一个活跃的开源项目。Ray 深度整合了 Python,你可以通过 pip install ray
来安装 ray。整个代码实现包括大约 40K 行,其中有 72% C++ 实现的系统层和 28% 的 Python 实现的应用层(截止目前,又增加了对 Java 的支持)。GCS 的每个分片使用了一个 Redis 的 key-val 存储,并且只设计单个键值对操作。GCS 的表通过按任务ID、数据对象集合进行切分来进行平滑扩展。每一片利用链式冗余策略(chained-replcated)来容错。我们将本地调度器和全局调度器都实现为了单线程、事件驱动的进程。本地调度器缓存了本地对象元信息,被阻塞的任务队列和等待调度的任务队列。为了在不同节点的对象存储之间无感知的传输超大对象,我们将大对象切片,利用多条 TCP 连接来并行传。
将所有碎片捏一块
图 7 通过一个简单的 a
加 b
(a,b
可以是标量,向量或者矩阵)然后返回 c
的例子展示了 Ray 端到端的工作流。远程函数 add()
在初始化 ( ray.init
) 的时候,会自动地被注册到 GCS 中,进而分发到集群中的每个工作进程。(图7a 的第零步)
图7a 展示了当一个驱动进程(driver)调用 add.remote(a, b)
,并且 a, b
分别存在节点 N1 和 N2 上时 ,Ray 的每一步操作。驱动进程将任务 add(a, b)
提交到本地调度器(步骤1),然后该任务请求被转到全局调度器(步骤2)(如前所述,如果本地任务排队队列没有超过设定阈值,该任务也可以在本地进行执行)。接着,全局调度器开始在 GCS 中查找 add(a, b)
请求中参数 a, b
的位置(步骤3),从而决定将该任务调度到节点 N2 上(因为 N2 上有其中一个参数 b
)(步骤4)。N2 节点上的本地调度器收到请求后(发现满足本地调度策略的条件,如满足资源约束,排队队列也没超过阈值,就会在本地开始执行该任务),会检查本地对象存储中是否存在任务 add(a, b)
的所有输入参数(步骤5)。由于本地对象存储中没有对象 a
,工作进程会在 GCS 中查找 a
的位置(步骤6)。这时候发现 a
存储在 N1 中,于是将其同步到本地的对象存储中(步骤7)。由于任务 add()
所有的输入参数对象都存在了本地存储中,本地调度器将在本地工作进程中执行 add()
(步骤8),并通过共享存储访问输入参数(步骤9)。
图 7b 展现了在 N1 上执行 ray.get()
和在 N2 上执行 add()
后所触发的逐步的操作。一旦 ray.get(id)
被调用,N1 上的用户驱动进程会在本地对象存储中查看该 id (即由远程调用 add()
返回的 future 值,所有 object id 是全局唯一的,GCS 可以保证这一点)对应的对象 c
是否存在(步骤1)。由于本地对象存储中没有 c
, 驱动进程会去 GCS 中查找 c
的位置。在此时,发现 GCS 中并没有 c 的存在,因为 c 根本还没有被创建出来。于是,N1 的对象存储向 GCS 中的对象表(Object Table)注册了一个回调函数,以监听 c
对象被创建事件(步骤2)。与此同时,在节点 N2 上,add() 任务执行完毕,将结果 c
存到其本地对象存储中(步骤3),同时也将 c
的位置信息添加到 GCS 的对象存储表中(步骤4)。GCS 监测到 c
的创建,会去触发之前 N1 的对象存储注册的回调函数(步骤5)。接下来,N1 的对象存储将 c
从 N2 中同步过去(步骤6),从而结束该任务。
尽管这个例子中涉及了大量的 RPC调用,但对于大部分情况来说,RPC 的数量会小的多,因为大部分任务会在本地被调度执行,而且 GCS 回复的对象信息会被本地调度器和全局调度器缓存(但是另一方面,执行了大量远程任务之后,本地对象存储很容易被撑爆)。
名词对照
workloads:工作负载,即描述任务需要做的工作。
GCS:Global Control Store,全局控制信息存储。
Object Table:存在于 GCS 中的对象表,记录了所有对象的位置等信息(objectId -> location)。
Object Store:本地对象存储,在实现中叫 Plasma,即存储任务所需对象的实例。
Lineage:血统信息,谱系信息;即计算时的数据变换前后的相继关系图。
Node:节点;Ray 集群中的每个物理节点。
Driver、Worker:驱动进程和工作进程,物理表现形式都是 Node 上的进程。但前者是用户侧使用 ray.init
时候生成的,随着 ray.shutdown
会进行销毁。后者是 ray 在启动的时在每个节点启动的无状态的驻留工作进程,一般和物理机 CPU 数量相同。
Actor:角色对象,语言层面,就是一个类;物理层面,表现为某个节点上的一个角色进程,维护了该角色对象内的所有上下文(角色成员变量)。
Actor method:角色方法,语言层面,就是类的成员方法;其所有输入包括显式的函数参数和隐式的成员变量。
Remote function:远程函数,即通过 @ray.remote 注册到系统的函数。在其被调度时,称为一个任务(Task)。
Job,Task:文中用到了不少 Job 和 Task 的概念,但是这两个概念在 CS 中其实定义比较模糊,不如进程和线程一般明确。Task 在本论文是对一个远程函数(remote action)或者一个 actor 的远程方法(remote method)的封装。而 Job 在当前的实现中并不存在,只是一个逻辑上的概念,其含义为运行一次用户侧代码所所涉及到的所有生成的 Task 以及产生的状态的集合。
Scheduler:paper 中统一用的 scheduler,但是有的是指部分(local scheduler 和 global scheduler),这时我翻译为调度器,有时候是指 Ray 中所有调度器构成的整体,这时我翻译为调度系统。
exponential averaging:我翻译成了移动指数平均,虽然他没有写移动。对于刚过去的前 n 项,以随着时间渐进指数增长的权重做加权平均。计算时候可以通过滑动窗口的概念方便的递推计算。
Future:这个我暂时不知道怎么翻,大概意思就是对于异步调用中的返回值句柄。相关信息可以参见维基百科 Future 和 promise。
引用
[1] 官方文档:https://ray.readthedocs.io/en/latest/
[2] 系统论文:https://www.usenix.org/system/files/osdi18-moritz.pdf
[3] 系统源码:https://github.com/ray-project/ray