导读
继 Spark 之后,UC Berkeley AMP 实验室又推出一重磅高性能AI计算引擎——Ray,号称支持每秒数百万次任务调度。那么它是怎么做到的呢?在试用之后,简单总结一下:
- 极简 Python API 接口:在函数或者类定义时加上
ray.remote
的装饰器并做一些微小改变,就能将单机代码变为分布式代码。这意味着不仅可以远程执行纯函数,还可以远程注册一个类(Actor模型),在其中维护大量context(成员变量),并远程调用其成员方法来改变这些上下文。 - 高效数据存储和传输:每个节点上通过共享内存(多进程访问无需拷贝)维护了一块局部的对象存储,然后利用专门优化过的 Apache Arrow格式来进行不同节点间的数据交换。
- 动态图计算模型:这一点得益于前两点,将远程调用返回的 future 句柄传给其他的远程函数或者角色方法,即通过远程函数的嵌套调用构建复杂的计算拓扑,并基于对象存储的发布订阅模式来进行动态触发执行。
- 全局状态维护:将全局的控制状态(而非数据)利用 Redis 分片来维护,使得其他组件可以方便的进行平滑扩展和错误恢复。当然,每个 redis 分片通过 chain-replica 来避免单点。
- 两层调度架构:分本地调度器和全局调度器;任务请求首先被提交到本地调度器,本地调度器会尽量在本地执行任务,以减少网络开销。在资源约束、数据依赖或者负载状况不符合期望时,会转给全局调度器来进行全局调度。
当然,还有一些需要优化的地方,比如 Job 级别的封装(以进行多租户资源配给),待优化的垃圾回收算法(针对对象存储,现在只是粗暴的 LRU),多语言支持(最近支持了Java,但不知道好不好用)等等。但是瑕不掩瑜,其架构设计和实现思路还是有很多可以借鉴的地方。
动机和需求
(开发 Ray 的动机始于强化学习(RL),但是由于其计算模型强大表达能力,使用绝不限于 RL。这一小节是以描述 RL 系统需求为契机,引出 Ray 的初始设计方向。但是由于不大熟悉强化学习,一些名词可能表达翻译不准确。如果只对其架构感兴趣,完全可以跳过这一节)
图1:一个 RL 系统的例子
我们从考虑 RL 系统的基本组件开始,逐渐完善 Ray 的需求。如图1所示,在一个 RL 系统的的设定中,智能体(agent)会反复与环境(environment)进行交互。智能体的目标是学习出一种最大化奖励(reward)的策略。策略(policy)本质上是从环境中状态到行为抉择(action)的一种映射。至于环境,智能体,状态,行为和奖励值的详细定义,则是由具体的应用所决定的。
为了学习策略,智能体通常要进行两步操作:1)策略评估(policy evaluation)和 2)策略优化(policy improvement)。为了评估一个策略,智能体和环境持续进行交互(一般是仿真的环境)以产生轨迹(trajectories)。轨迹是在当前环境和给定策略下产生的一个二元组(状态,奖励值)序列。然后,智能体根据这些轨迹来反馈优化该策略,即,向最大化奖励值的梯度方向更新策略。图2展示了智能体用来学习策略一个例子的伪码。该伪码通过调用 rollout(environment, policy)
来评估策略,进而产生仿真轨迹。train_policy()
接着会用这些轨迹作为输入,调用 policy.update(trajectories)
来优化当前策略。会重复迭代这个过程直到策略收敛。
// evaluate policy by interacting with env. (e.g., simulator) rollout(policy, environment): trajectory = [] state = environment.initial_state() while (not environment.has_terminated()): action = policy.compute(state) // Serving state, reward = environment.step(action) // Simulation trajectory.append(state, reward) return trajectory // improve policy iteratively until it converges train_policy(environment): policy = initial_policy() while (policy has not converged): trajectories = [] for i from 1 to k: // evaluate policy by generating k rollouts trajectories.append(rollout(policy, environment)) // improve policy policy = policy.update(trajectories) // Training return policy
图2:一段用于学习策略的典型的伪代码
由此看来,针对 RL 应用的计算框架需要高效的支持模型训练(training),在线预测(serving)和平台仿真(simulation)(如图1所示)。接下来,我们简要说明一下这些工作负载(workloads)。
模型训练一般会涉及到在分布式的环境中跑随机梯度下降模型(stochastic gradient descent,SGD)来更新策略。而分布式 SGD 通常依赖于 allreduce 聚合步骤或参数服务器(parameter server).
在线预测 使用已经训练好的策略并基于当前环境来给出动作决策。预测系统通常要求降低预测延迟,提高决策频次。为了支持扩展,最好能够将负载均摊到多节点上来协同进行预测。
最后,大多数现存的 RL 应用使用仿真(simulations) 来对策略进行评估——因为现有的 RL 算法不足以单独依赖从与物理世界的交互中高效的进行取样。这些仿真器在复杂度上跨度极大。也许只需要几毫秒(如模拟国际象棋游戏中的移动),也许会需要几分钟(如为了一个自动驾驶的车辆模拟真实的环境)。
与模型训练和在线预测可以在不同系统中进行处理的监督学习相比, RL 中所有三种工作负载都被紧耦合在了单个应用中,并且对不同负载间的延迟要求很苛刻。现有的系统中还没有能同时支持三种工作负载的。理论上,可以将多个专用系统组合到一块来提供所有能力,但实际上,子系统间的结果传输的延迟在 RL 下是不可忍受的。因此,RL 的研究人员和从业者不得不针对每个需求单独构建多套一次性的专用系统。
这些现状要求为 RL 开发全新的分布式框架,可以有效地支持训练,预测和仿真。尤其是,这样的框架应具有以下能力:
支持细粒度,异构的计算。RL 计算的运行持续时间往往从数毫秒(做一个简单的动作)到数小时(训练一个复杂的策略)。此外,模型训练通常需要各种异构的硬件支持(如CPU,GPU或者TPU)。
提供灵活的计算模型。RL 应用同时具有有状态和无状态类型的计算。无状态的计算可以在系统中的任何节点进行执行,从而可以方便的进行负载均衡和按需的数据传输。因此,无状态的计算非常适合细粒度的仿真和数据处理,例如从视频或图片中提取特征。相比之下,有状态的计算适合用来实现参数服务器、在支持 GPU 运算的数据上进行重复迭代或者运行不暴露内部状态参数的第三方仿真器。
动态的执行能力。RL 应用中的很多模块要求动态的进行执行,因为他们计算完成的顺序并不总是预先确定(例如:仿真的完成顺序),并且,一个计算的运行结果可以决定是否执行数个将来的计算(如,某个仿真的运行结果将决定我们是否运行更多的仿真)。
除此之外,我们提出了两个额外的要求。首先,为了高效的利用大型集群,框架必须支持每秒钟数百万次的任务调度。其次,框架不是为了支持从头开始实现深度神经网络或者复杂的仿真器,而是必须和现有的仿真器(OpenAI gym等)和深度学习框架(如TensorFlow,MXNet,Caffe, PyTorch)无缝集成。
语言和计算模型
Ray 实现了动态任务图计算模型,即,Ray 将应用建模为一个在运行过程中动态生成依赖的任务图。在此模型之上,Ray 提供了角色模型(Actor)和并行任务模型(task-parallel)的编程范式。Ray 对混合计算范式的支持使其有别于与像 CIEL 一样只提供并行任务抽象和像 Orleans 或 Akka 一样只提供角色模型抽象的系统。
编程模型
任务模型(Tasks)。一个任务表示一个在无状态工作进程执行的远程函数(remote function)。当一个远程函数被调用的时候,表示任务结果的 future 会立即被返回(也就是说所有的远程函数调用都是异步的,调用后会立即返回一个任务句柄)。可以将 Futures传给 ray.get()
以阻塞的方式获取结果,也可以将 Futures 作为参数传给其他远程函数,以非阻塞、事件触发的方式进行执行(后者是构造动态拓扑图的精髓)。Futures 的这两个特性让用户在构造并行任务的同时指定其依赖关系。下表是 Ray 的所有 API(相当简洁而强大,但是实现起来会有很多坑,毕竟所有装饰有 ray.remote
的函数或者类及其上下文都要序列化后传给远端节点,序列化用的和 PySpark 一样的 cloudpickle)。
Name | Description |
futures = f.remote(args) | Execute function f remotely. f.remote() can take objects or futures as inputs and returns one or more futures. This is non-blocking. |
objects = ray.get(futures) | Return the values associated with one or more futures. This is blocking. |
ready futures = ray.wait(futures, k, timeout) | Return the futures whose corresponding tasks have completed as soon as either k have completed or the timeout expires. |
actor = Class.remote(args) futures = actor.method.remote(args) |
Instantiate class Class as a remote actor, and return a handle to it. Call a method on the remote actor and return one or more futures. Both are non-blocking. |
表1 Ray API
远程函数作用于不可变的物体上,并且应该是无状态的并且没有副作用的:这些函数的输出仅取决于他们的输入(纯函数)。这意味着幂等性(idempotence),获取结果出错时只需要重新执行该函数即可,从而简化容错设计。
角色模型(Actors)。一个角色对象代表一个有状态的计算过程。每个角色对象暴露了一组可以被远程调用,并且按调用顺序依次执行的成员方法(即在同一个角色对象内是串行执行的,以保证角色状态正确的进行更新)。一个角色方法的执行过程和普通任务一样,也会在远端(每个角色对象会对应一个远端进程)执行并且立即返回一个 future;但不同的是,角色方法会运行在一个有状态(stateful)的工作进程上。一个角色对象的句柄(handle)可以传递给其他角色对象或者远程任务,从而使他们能够在该角色对象上调用这些成员函数。
Tasks | Actors |
细粒度的负载均衡 | 粗粒度的负载均衡 |
支持对象的局部性(对象存储cache) | 比较差的局部性支持 |
微小更新开销很高 | 微小更新开销不大 |
高效的错误处理 | 检查点(checkpoint)恢复带来较高开销 |
表2 任务模型 vs. 角色模型的对比
表2 比较了任务模型和角色模型在不同维度上的优劣。任务模型利用集群节点的负载信息和依赖数据的位置信息来实现细粒度的负载均衡,即每个任务可以被调度到存储了其所需参数对象的空闲节点上;并且不需要过多的额外开销,因为不需要设置检查点和进行中间状态的恢复。与之相比,角色模型提供了极高效的细粒度的更新支持,因为这些更新作用在内部状态(即角色成员变量所维护的上下文信息)而非外部对象(比如远程对象,需要先同步到本地)。后者通常来说需要进行序列化和反序列化(还需要进行网络传输,因此往往很费时间)。例如,角色模型可以用来实现参数服务器(parameter servers)和基于GPU 的迭代式计算(如训练)。此外,角色模型可以用来包裹第三方仿真器(simulators)或者其他难以序列化的对象(比如某些模型)。
为了满足异构性和可扩展性,我们从三个方面增强了 API 的设计。首先,为了处理长短不一的并发任务,我们引入了 ray.wait()
,它可以等待前 k 个结果满足了就返回;而不是像 ray.get()
一样,必须等待所有结果都满足后才返回。其次,为了处理对不同资源纬度( resource-heterogeneous)需求的任务,我们让用户可以指定所需资源用量(例如装饰器:ray.remote(gpu_nums=1)
),从而让调度系统可以高效的管理资源(即提供一种交互手段,让调度系统在调度任务时相对不那么盲目)。最后,为了提灵活性,我们允许构造嵌套远程函数(nested remote functions),意味着在一个远程函数内可以调用另一个远程函数。这对于获得高扩展性是至关重要的,因为它允许多个进程以分布式的方式相互调用(这一点是很强大的,通过合理设计函数,可以使得可以并行部分都变成远程函数,从而提高并行性)。
计算模型
Ray 采用的动态图计算模型,在该模型中,当输入可用(即任务依赖的所有输入对象都被同步到了任务所在节点上)时,远程函数和角色方法会自动被触发执行。在这一小节,我们会详细描述如何从一个用户程序(图3)来构建计算图(图4)。该程序使用了表1 的API 实现了图2 的伪码。
@ray.remote def create_policy(): # Initialize the policy randomly. return policy @ray.remote(num_gpus=1) class Simulator(object): def __init__(self): # Initialize the environment. self.env = Environment() def rollout(self, policy, num_steps): observations = [] observation = self.env.current_state() for _ in range(num_steps): action = policy(observation) observation = self.env.step(action) observations.append(observation) return observations @ray.remote(num_gpus=2) def update_policy(policy, *rollouts): # Update the policy. return policy @ray.remote def train_policy(): # Create a policy. policy_id = create_policy.remote() # Create 10 actors. simulators = [Simulator.remote() for _ in range(10)] # Do 100 steps of training. for _ in range(100): # Perform one rollout on each actor. rollout_ids = [s.rollout.remote(policy_id) for s in simulators] # Update the policy with the rollouts. policy_id = update_policy.remote(policy_id, *rollout_ids) return ray.get(policy_id)
图3:在 Ray 中实现图2逻辑的代码,注意装饰器 `@ray.remote会将被注解的方法或类声明为远程函数或者角色对象。调用远程函数或者角色方法后会立即返回一个 future 句柄,该句柄可以被传递给随后的远程函数或者角色方法,以此来表达数据间的依赖关系。每个角色对象包含一个环境对象
self.env` ,这个环境状态为所有角色方法所共享。
在不考虑角色对象的情况下,在一个计算图中有两种类型的点:数据对象(data objects)和远程函数调用(或者说任务)。同样,也有两种类型的边:数据边(data edges)和控制边(control edges)。数据边表达了数据对象任务间的依赖关系。更确切来说,如果数据对象 D 是任务 T 的输出,我们就会增加一条从 T 到 D 的边。类似的,如果 D是 任务 T 的输入,我们就会增加一条 D 到 T 的边。控制边表达了由于远程函数嵌套调用所造成的计算依赖关系,即,如果任务 T1 调用任务 T2, 我们就会增加一条 T1 到 T2 的控制边。
在计算图中,角色方法调用也被表示成了节点。除了一个关键不同点外,他们和任务调用间的依赖关系基本一样。为了表达同一个角色对象上的连续方法调用所形成的状态依赖关系,我们向计算图添加第三种类型的边:在同一个角色对象上,如果角色方法 Mj 紧接着 Mi 被调用,我们就会添加一条 Mi 到 Mj 的状态边(即 Mi 调用后会改变角色对象中的某些状态,或者说成员变量;然后这些变化后的成员变量会作为 Mj 调用的隐式输入;由此,Mi 到 Mj 间形成了某种隐式依赖关系)。这样一来,作用在同一角色对象上的所有方法调用会形成一条由状态边串起来的调用链(chain,见图4)。这条调用链表达了同一角色对象上方法被调用的前后相继的依赖关系。
图4:该图与图3 train_policy.remote()
调用相对应。远程函数调用和角色方法调用对应图中的任务(tasks)。该图中显示了两个角色对象A10和A20,每个角色对象的方法调用(被标记为 A1i 和 A2i 的两个任务)之间都有状态边(stateful edge)连接,表示这些调用间共享可变的角色状态。从 train_policy
到被其调用的任务间有控制边连接。为了并行地训练多种策略,我们可以调用 train_policy.remote()
多次。
状态边让我们将角色对象嵌入到无状态的任务图中,因为他们表达出了共享状态、前后相继的两个角色方法调用之间的隐式数据依赖关系。状态边的添加还可以让我们维护谱系图(lineage),如其他数据流系统一样,我们也会跟踪数据的谱系关系以在必要的时候进行数据的重建。通过显式的将状态边引入数据谱系图中,我们可以方便的对数据进行重建,不管这些数据是远程函数产生的还是角色方法产生的(小节4.2.3中会详细讲)。
架构
Ray 的架构组成包括两部分:
- 实现 API 的应用层,现在包括 Python 和 Java分别实现的版本。
- 提供高扩展性和容错的系统层,用 C++ 写的,以CPython的形式嵌入包中。
图5:Ray 的架构包括两部分:系统层和应用层。前者实现了API和计算模型,后者实现了任务调度和数据管理,以满足性能要求和容错需求