之前文章写了 Ray 的论文翻译。后来我花了些时间读了读 Ray 的源码,为了学习和记忆,后续预计会出一系列的源码解析文章。为了做到能持续更新,尽量将模块拆碎些,以保持较短篇幅。另外,阅历所限,源码理解不免有偏颇指出,欢迎大家一块讨论。
概述
Ray 核心的设计之一就是基于资源定制的细粒度、高吞吐的任务调度。为了实现这一点,Ray 将所有输入和输出存在基于共享内存的 Plasma 中;将所有状态存在基于 Redis 的 GCS 中,然后基于此进行去中心化的调度。即每个节点都可以拿到全局信息来进行局部调度决策,不过这也是不好做复杂调度策略的原因之一。
Ray 任务分为两种,无状态的 Task 和有状态的 Actor Method,后者又可以细分为 Actor Create Method (对应构造函数)和普通 Actor Method(对应成员函数)。
Ray 是可以显式指定任务的资源(主要是 CPU 和 GPU)约束的,因此需要对所有节点的资源在框架层进行量化(ResourcesSet),以感知增加,进行分配、实现回收等等。在调度时,需要找到满足任务资源约束的节点,将任务调度过去。
由于所有 Task 的输入存在分布式的内存存储 Plasma 中,因此将 Task 调度到某个节点之后,需要对所依赖的输入进行跨节点传输。或者直接将任务调度到满足依赖的节点上,但事实上 Ray 对于一般 Task 并没有这么做,后面会详细讲原因。对于 Actor Method 来说,由于其对应 Actor 常驻某个节点,其相关的所有 Actor Method 定会调度到该节点上。
上面所说的任务所在节点、当前的状态、依赖对象的位置等等信息,都是存在全局控制存储 GCS 中的。因此每次改变状态后,要和 GCS 交互将状态写入。在由于节点失联或者宕机导致任务失败时,会根据 GCS 存的任务的状态信息对任务进行重试。通过订阅 GCS 的某些状态的变化事件,可以驱动任务状态变化。
其他的还有根据 lineage + snapshot 进行快照恢复,Actor lineage 的构建等等,这里先卖个关子,后面系列文章会详细来说。
本文主要针对所有任务的状态转移和组织形式进行展开。
状态机
复杂的任务调度必然需要一个合理的状态机来描述。以下是 Ray 文档 给出的任务状态定义和转移图。
状态定义
- 可放置(Placeable):任务准备好了被调度到某个节点上(本地或者远程)。调度决策主要是依据任务资源约束和节点剩余资源的匹配程度。当前没有考虑任务依赖对象的位置信息。如果本地节点满足任务资源需求,那么任务就被安排在本地进行执行,否则将会被转发(forward)到其他满足资源需求的节点。不过该状态决策不一定是最终决策,该任务稍后仍然可能被挤(spill over)到其他节点(因为调度那一刻满足资源,但是执行时,发现已经执行了其他任务,导致节点不满足资源约束了)。
- 等待Actor创建(WaitForActorCreation):一个 Actor Method 等待其 Actor 实例被创建(大多数发生在Actor 错误恢复时,否则一般来说是 Actor Create Method 先执行)。一旦 Actor 实例被创建,并且通过 GCS 被该 Actor Method 感知到,它就会被调度到 Actor 实例所在的节点。
- 等待(Waiting):任务等待其输入对象被满足,比如,等待任务函数参数对象从其他节点调度到本地的对象存储中。
- 就绪(Ready):任务所依赖的对象都在本地的对象存储中了,因此任务已经准备好在本地(指的是任务当前所在节点,下面也是)运行了。
- 运行(Running):任务已经调度到本地执行了,运行在本地的 Actor 或者 Worker 进程中。
- 阻塞(Blocked):任务某些依赖对象不可用(即不在本地)。不在本地怎么之前能跑呢,这里说明一下,Ray 的任务是支持嵌套调用的(对应远程函数的嵌套调用),那么一个任务 A 在运行时生成了一个任务 B ,并且等待其结果返回的话(
ray.get
)。任务 A 就会被阻塞(Blocked),等待 B 的执行结束。 - 不可放置(Infeasible):任务的资源需求不能被当前集群内任何一台机器的所有资源(注意不是剩余资源)所满足。但如果有机器新加入集群,就可以试探这些 任务的资源需求是否能够被满足了。
状态转移图
状态枚举类定义在 scheduling_queue.h
中:
enum class TaskState { // The task may be placed on a node. PLACEABLE, // The task has been placed on a node and is waiting for some object // dependencies to become local. WAITING, // The task has been placed on a node, all dependencies are satisfied, and is // waiting for resources to run. READY, // The task is running on a worker. The task may also be blocked in a ray.get // or ray.wait call, in which case it also has state BLOCKED. RUNNING, // The task has resources that cannot be satisfied by any node, as far as we // know. INFEASIBLE, // The task is an actor method and is waiting to learn where the actor was // created. WAITING_FOR_ACTOR_CREATION, // Swap queue for tasks that are in between states. This can happen when a // task is removed from one queue, and an async callback is responsible for // re-queuing the task. For example, a READY task that has just been assigned // to a worker will get moved to the SWAP queue while waiting for a response // from the worker. If the worker accepts the task, the task will be added to // the RUNNING queue, else it will be returned to READY. SWAP, // The number of task queues. All states that precede this enum must have an // associated TaskQueue in SchedulingQueue. All states that succeed // this enum do not have an associated TaskQueue, since the tasks // in those states may not have any associated task data. kNumTaskQueues, // The task is running but blocked in a ray.get or ray.wait call. Tasks that // were explicitly assigned by us may be both BLOCKED and RUNNING, while // tasks that were created out-of-band (e.g., the application created // multiple threads) are only BLOCKED. BLOCKED, // The task is a driver task. DRIVER, };
相对于状态机中的状态,此处多了几个枚举值。包括 SWAP、DRIVER。此外还有个神奇的 kNumTaskQueues,这个先按下不表,说说前两个。
- SWAP:任务的分派是异步的,即 Ray 将一个处于 Ready 状态的任务分配给某个 Worker 后。只有在回调函数中才能最终知晓是分配成功了,还是分配失败了,从而将任务状态转移到 Running 或者 Ready。但是在这个空当中,任务应该处于什么状态呢?这就是 Swap 的作用了(但不知道为什么没有显式的作为状态机中的一个状态)。
- DRIVER:这个就是标识某个任务是用户代码进程,从而将所有任务都统一来管理。
任务队列(TaskQueue)
Ray 将所有任务按状态(TaskState)聚集组织在一个个队列中, 这些队列即任务队列(TaskQueue)。每个队列定义了任务增加、删除和查找等基本操作。此外,还有一个重要的接口,就是获取该队列中所有任务所需资源的总和。比如说在调度某个任务时,想要知道某个节点对剩余可用资源,就需要用该节点的总资源,减去正在运行的任务的所需资源和就绪任务的所需资源(需要优先本地调度)。
值得一提的是,在删除任务的时候,如果 removed_tasks 参数不为空指针,则将删除的任务放到里面。这样如果多次删除,可以将任务收集到一个数组中。
还有一个比较冗余的点,即通过 task.GetTaskSpecification.TaskId()
可以获取到 task_id,不知道为什么还在 AppendTask 参数中增加 task_id 呢,为了一致性?
至于具体实现上,用了比较经典的链表+哈希方式组织。可以使得增删改查的时间都是O(1),获取全部任务的时间是 O(n)——遍历链表即可。
class TaskQueue { public: virtual ~TaskQueue() {} // 任务的增删改查操作 virtual bool AppendTask(const TaskID &task_id, const Task &task); virtual bool RemoveTask(const TaskID &task_id, std::vector<Task> *removed_tasks = nullptr); bool HasTask(const TaskID &task_id) const; const std::list<Task> &GetTasks() const; const Task &GetTask(const TaskID &task_id) const; // 获取队列中所需资源总和 const ResourceSet &GetCurrentResourceLoad() const; protected: // 链表+哈希组织,可以快速查找O(1)和线性遍历O(n) std::list<Task> task_list_; std::unordered_map<TaskID, std::list<Task>::iterator> task_map_; // 所有任务所需资源总和 ResourceSet current_resource_load_; };
在此基础上针对 Ready 这个状态又造了个 ReadyQueue
;主要是增加了 ResourceSet -> Task Ids
的映射:即增加了一个索引,将所有具有相同资源需求的就绪任务集合在一块。这样在进行调度(DispatchTasks
)时,如果发现某个任务的资源需求本地节点不能满足,那么就跳过所有具有同样资源需求的任务,算是一个调度的优化(对应逻辑在NodeManager::DispatchTasks
中)。
调度队列(SchedulingQueue)
按状态集合上述任务队列,再加以不同队列之间的任务换入换出操作,则成为调度队列(SchedulingQueue)。当 Ray 发生不同事件时,驱动任务状态机内状态进行转移,即调用 SchedulingQueue
暴露的接口,将任务从一个状态队列移到另一个状态队列中,并且做一些上下文的转换工作,以此来实现任务的调度。
需要注意的是,每个节点会维护一个调度队列,存储本节点持有的所有任务。
class SchedulingQueue { public: /// 创建一个空的调度队列,初始化各个状态对应的任务队列,就绪队列被单独拿出来用 ReadyQueue 做初始化。 SchedulingQueue() : ready_queue_(std::make_shared<ReadyQueue>()) { for (const auto &task_state : { TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING, TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP, }) { if (task_state == TaskState::READY) { task_queues_[static_cast<int>(task_state)] = ready_queue_; } else { task_queues_[static_cast<int>(task_state)] = std::make_shared<TaskQueue>(); } } } // 我觉得名字起得不好,他的实际操作是获取所有就绪任务资源需求之和 ResourceSet GetResourceLoad() const; /// 单个任务的增删查 bool HasTask(const TaskID &task_id) const; const Task &GetTaskOfState(const TaskID &task_id, TaskState task_state) const; bool RemoveTask(const TaskID &task_id, Task *removed_task, TaskState *removed_task_state = nullptr); // 按状态获取任务,对于就绪状态,还需要按资源进行聚集 const std::unordered_map<ResourceSet, ordered_set<TaskID>> &GetReadyTasksWithResources() const; const std::list<Task> &GetTasks(TaskState task_state) const; // 一组任务的移来移去 std::vector<Task> RemoveTasks(std::unordered_set<TaskID> &task_ids); void QueueTasks(const std::vector<Task> &tasks, TaskState task_state); void MoveTasks(std::unordered_set<TaskID> &tasks, TaskState src_state, TaskState dst_state); void FilterState(std::unordered_set<TaskID> &task_ids, TaskState filter_state) const; /// 这两个函数是按其他维度:Actor 和 Job 来获取一组任务 std::unordered_set<TaskID> GetTaskIdsForJob(const JobID &job_id) const; std::unordered_set<TaskID> GetTaskIdsForActor(const ActorID &actor_id) const; /// 阻塞任务和用户进程增删改查 const std::unordered_set<TaskID> &GetBlockedTaskIds() const; const std::unordered_set<TaskID> &GetDriverTaskIds() const; void AddBlockedTaskId(const TaskID &task_id); void RemoveBlockedTaskId(const TaskID &task_id); void AddDriverTaskId(const TaskID &task_id); void RemoveDriverTaskId(const TaskID &task_id); /// 用来调试和监控 std::string DebugString() const; void RecordMetrics() const; /// 这个好像没啥用,都没实现 ResourceSet GetReadyQueueResources() const; private: /// 一个辅助函数,由于调度队列算是有两层索引 task state -> (task id -> task), /// 因此经常需要定位到某个状态对应的任务队列,进而获取其中的某个任务。 const std::shared_ptr<TaskQueue> &GetTaskQueue(TaskState task_state) const; /// 两个辅助函数,用来在指定状态的任务队列中删除或者过滤任务的 void RemoveTasksFromQueue(ray::raylet::TaskState task_state, std::unordered_set<ray::TaskID> &task_ids, std::vector<ray::Task> *removed_tasks); void FilterStateFromQueue(std::unordered_set<ray::TaskID> &task_ids, TaskState task_state) const; // kNumTaskQueues 作用便在此,所有int值在其之前的状态都有对应的任务队列 std::array<std::shared_ptr<TaskQueue>, static_cast<int>(TaskState::kNumTaskQueues)> task_queues_; // 调度时候,就绪队列用的比较多,就单独维护一个指针在此 const std::shared_ptr<ReadyQueue> ready_queue_; // 这两个状态(blocked 和 driver)没有对应的任务队列,只是用集合来保存id std::unordered_set<TaskID> blocked_task_ids_; std::unordered_set<TaskID> driver_task_ids_; };
从上面代码我们可以看出以下几点:
- 所有函数基本是围绕单个任务或者一组任务的增删改查而来的。
- 所有任务实际上按二层索引组织 task state -> (task id -> task);因此定位到一个任务需要先经过 task state 这一层,于是造了辅助函数来进行这层操作:
GetTaskQueue
。此外,还有大量的在不同任务队列间倒来倒去的辅助函数。 - 上面所说的 kNumTaskQueues 是一个假状态,它本质上是一个界标。将其转换为整形后,所有小于它的状态都是按任务队列组织任务,所有大于它的状态只是用集合来存了任务ID(blocked 任务和 driver 任务)。
- 对于就绪队列,有一些特殊的照顾,因为实际将就绪任务安排到某个 worker 执行时很大的一块调度内容。这些额外照顾包括:a. 单独给就绪队列维护了一个指针、 b. 提供获取就绪队列资源需求之和接口、 c. 提供按同样资源需求聚集所有就绪任务接口。
- 还有两个按照其他维度获取一组资源的接口:
GetTaskIdsForJob
和GetTaskIdsForActor
可以分别根据给定 JobId 和 ActorId 来获取一组任务。
名词释义
Task Required Resources:任务资源需求或者任务资源约束,通过在函数上添加注解 ray.remote(num_cpus=xx, num_gpus=xx)
来指定。其中 GPU 还可以指定小数个,以使多个任务共享一个 GPU。
Task argument:任务输入或者任务参数。如果翻译为输入是相对任务来说的,如果翻译为参数,是相对任务所执行的函数参数来说的。
Object:这里翻译为了数据对象。
Object Store:基于内存的不可变对象存储,是分散在各个节点的节点内、进程间的共享存储。
Node,Machine:指的是组成集群的每个机器。如果非要区分的话,Node可能更偏重逻辑上的节点,Machine 更偏重逻辑节点所在的物理机。但是在 Ray 中他们是一一对应的,即一个机器只有一个节点。
本篇就先到这里,下一篇计划写写调度策略或者资源定义。