Ray 源码解析(一):任务的状态转移和组织形式

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Ray 源码解析(一):任务的状态转移和组织形式

之前文章写了 Ray 的论文翻译。后来我花了些时间读了读 Ray 的源码,为了学习和记忆,后续预计会出一系列的源码解析文章。为了做到能持续更新,尽量将模块拆碎些,以保持较短篇幅。另外,阅历所限,源码理解不免有偏颇指出,欢迎大家一块讨论。

概述

Ray 核心的设计之一就是基于资源定制的细粒度、高吞吐的任务调度。为了实现这一点,Ray 将所有输入和输出存在基于共享内存的 Plasma 中;将所有状态存在基于 Redis 的 GCS 中,然后基于此进行去中心化的调度。即每个节点都可以拿到全局信息来进行局部调度决策,不过这也是不好做复杂调度策略的原因之一。

Ray 任务分为两种,无状态的 Task 和有状态的 Actor Method,后者又可以细分为 Actor Create Method  (对应构造函数)和普通 Actor Method(对应成员函数)。

Ray 是可以显式指定任务的资源(主要是 CPU 和 GPU)约束的,因此需要对所有节点的资源在框架层进行量化(ResourcesSet),以感知增加,进行分配、实现回收等等。在调度时,需要找到满足任务资源约束的节点,将任务调度过去。

由于所有 Task 的输入存在分布式的内存存储 Plasma 中,因此将 Task 调度到某个节点之后,需要对所依赖的输入进行跨节点传输。或者直接将任务调度到满足依赖的节点上,但事实上 Ray 对于一般 Task 并没有这么做,后面会详细讲原因。对于 Actor Method 来说,由于其对应 Actor 常驻某个节点,其相关的所有 Actor Method 定会调度到该节点上。

上面所说的任务所在节点、当前的状态、依赖对象的位置等等信息,都是存在全局控制存储 GCS 中的。因此每次改变状态后,要和 GCS 交互将状态写入。在由于节点失联或者宕机导致任务失败时,会根据 GCS 存的任务的状态信息对任务进行重试。通过订阅 GCS 的某些状态的变化事件,可以驱动任务状态变化。

其他的还有根据 lineage + snapshot 进行快照恢复,Actor lineage 的构建等等,这里先卖个关子,后面系列文章会详细来说。

本文主要针对所有任务的状态转移和组织形式进行展开

状态机

复杂的任务调度必然需要一个合理的状态机来描述。以下是 Ray 文档 给出的任务状态定义和转移图。

状态定义

  • 可放置(Placeable):任务准备好了被调度到某个节点上(本地或者远程)。调度决策主要是依据任务资源约束和节点剩余资源的匹配程度。当前没有考虑任务依赖对象的位置信息。如果本地节点满足任务资源需求,那么任务就被安排在本地进行执行,否则将会被转发(forward)到其他满足资源需求的节点。不过该状态决策不一定是最终决策,该任务稍后仍然可能被(spill over)到其他节点(因为调度那一刻满足资源,但是执行时,发现已经执行了其他任务,导致节点不满足资源约束了)。
  • 等待Actor创建(WaitForActorCreation):一个 Actor Method 等待其 Actor 实例被创建(大多数发生在Actor 错误恢复时,否则一般来说是 Actor Create Method 先执行)。一旦 Actor 实例被创建,并且通过 GCS 被该 Actor Method 感知到,它就会被调度到 Actor 实例所在的节点。
  • 等待(Waiting):任务等待其输入对象被满足,比如,等待任务函数参数对象从其他节点调度到本地的对象存储中。
  • 就绪(Ready):任务所依赖的对象都在本地的对象存储中了,因此任务已经准备好在本地(指的是任务当前所在节点,下面也是)运行了。
  • 运行(Running):任务已经调度到本地执行了,运行在本地的 Actor 或者 Worker 进程中。
  • 阻塞(Blocked):任务某些依赖对象不可用(即不在本地)。不在本地怎么之前能跑呢,这里说明一下,Ray 的任务是支持嵌套调用的(对应远程函数的嵌套调用),那么一个任务 A 在运行时生成了一个任务  B ,并且等待其结果返回的话(ray.get)。任务 A 就会被阻塞(Blocked),等待 B 的执行结束。
  • 不可放置(Infeasible):任务的资源需求不能被当前集群内任何一台机器的所有资源(注意不是剩余资源)所满足。但如果有机器新加入集群,就可以试探这些 任务的资源需求是否能够被满足了。

20210104203500268.jpg

状态转移图

状态枚举类定义在 scheduling_queue.h 中:

enum class TaskState {
 // The task may be placed on a node.
 PLACEABLE,
 // The task has been placed on a node and is waiting for some object
 // dependencies to become local.
 WAITING,
 // The task has been placed on a node, all dependencies are satisfied, and is
 // waiting for resources to run.
 READY,
 // The task is running on a worker. The task may also be blocked in a ray.get
 // or ray.wait call, in which case it also has state BLOCKED.
 RUNNING,
 // The task has resources that cannot be satisfied by any node, as far as we
 // know.
 INFEASIBLE,
 // The task is an actor method and is waiting to learn where the actor was
 // created.
 WAITING_FOR_ACTOR_CREATION,
 // Swap queue for tasks that are in between states. This can happen when a
 // task is removed from one queue, and an async callback is responsible for
 // re-queuing the task. For example, a READY task that has just been assigned
 // to a worker will get moved to the SWAP queue while waiting for a response
 // from the worker. If the worker accepts the task, the task will be added to
 // the RUNNING queue, else it will be returned to READY.
 SWAP,
 // The number of task queues. All states that precede this enum must have an
 // associated TaskQueue in SchedulingQueue. All states that succeed
 // this enum do not have an associated TaskQueue, since the tasks
 // in those states may not have any associated task data.
 kNumTaskQueues,
 // The task is running but blocked in a ray.get or ray.wait call. Tasks that
 // were explicitly assigned by us may be both BLOCKED and RUNNING, while
 // tasks that were created out-of-band (e.g., the application created
 // multiple threads) are only BLOCKED.
 BLOCKED,
 // The task is a driver task.
 DRIVER,
};

相对于状态机中的状态,此处多了几个枚举值。包括 SWAP、DRIVER。此外还有个神奇的 kNumTaskQueues,这个先按下不表,说说前两个。

  • SWAP:任务的分派是异步的,即 Ray 将一个处于 Ready 状态的任务分配给某个 Worker 后。只有在回调函数中才能最终知晓是分配成功了,还是分配失败了,从而将任务状态转移到 Running 或者 Ready。但是在这个空当中,任务应该处于什么状态呢?这就是 Swap 的作用了(但不知道为什么没有显式的作为状态机中的一个状态)。
  • DRIVER:这个就是标识某个任务是用户代码进程,从而将所有任务都统一来管理。

任务队列(TaskQueue)

Ray 将所有任务按状态(TaskState)聚集组织在一个个队列中, 这些队列即任务队列(TaskQueue)。每个队列定义了任务增加、删除和查找等基本操作。此外,还有一个重要的接口,就是获取该队列中所有任务所需资源的总和。比如说在调度某个任务时,想要知道某个节点对剩余可用资源,就需要用该节点的总资源,减去正在运行的任务的所需资源和就绪任务的所需资源(需要优先本地调度)。

值得一提的是,在删除任务的时候,如果 removed_tasks 参数不为空指针,则将删除的任务放到里面。这样如果多次删除,可以将任务收集到一个数组中。

还有一个比较冗余的点,即通过 task.GetTaskSpecification.TaskId() 可以获取到 task_id,不知道为什么还在 AppendTask 参数中增加 task_id 呢,为了一致性?

至于具体实现上,用了比较经典的链表+哈希方式组织。可以使得增删改查的时间都是O(1),获取全部任务的时间是 O(n)——遍历链表即可。

class TaskQueue {
public:
 virtual ~TaskQueue() {}
 // 任务的增删改查操作
 virtual bool AppendTask(const TaskID &task_id, const Task &task);
 virtual bool RemoveTask(const TaskID &task_id,
                         std::vector<Task> *removed_tasks = nullptr);
 bool HasTask(const TaskID &task_id) const;
 const std::list<Task> &GetTasks() const;
 const Task &GetTask(const TaskID &task_id) const;
 // 获取队列中所需资源总和
 const ResourceSet &GetCurrentResourceLoad() const;
protected:
 // 链表+哈希组织,可以快速查找O(1)和线性遍历O(n)
 std::list<Task> task_list_;
 std::unordered_map<TaskID, std::list<Task>::iterator> task_map_;
 // 所有任务所需资源总和
 ResourceSet current_resource_load_;
};

在此基础上针对 Ready 这个状态又造了个 ReadyQueue;主要是增加了 ResourceSet -> Task Ids 的映射:即增加了一个索引,将所有具有相同资源需求的就绪任务集合在一块。这样在进行调度(DispatchTasks)时,如果发现某个任务的资源需求本地节点不能满足,那么就跳过所有具有同样资源需求的任务,算是一个调度的优化(对应逻辑在NodeManager::DispatchTasks 中)。

调度队列(SchedulingQueue)

按状态集合上述任务队列,再加以不同队列之间的任务换入换出操作,则成为调度队列(SchedulingQueue)。当 Ray 发生不同事件时,驱动任务状态机内状态进行转移,即调用 SchedulingQueue 暴露的接口,将任务从一个状态队列移到另一个状态队列中,并且做一些上下文的转换工作,以此来实现任务的调度。

需要注意的是,每个节点会维护一个调度队列,存储本节点持有的所有任务。

class SchedulingQueue {
public:
 /// 创建一个空的调度队列,初始化各个状态对应的任务队列,就绪队列被单独拿出来用 ReadyQueue 做初始化。
 SchedulingQueue() : ready_queue_(std::make_shared<ReadyQueue>()) {
   for (const auto &task_state : {
            TaskState::PLACEABLE,
            TaskState::WAITING,
            TaskState::READY,
            TaskState::RUNNING,
            TaskState::INFEASIBLE,
            TaskState::WAITING_FOR_ACTOR_CREATION,
            TaskState::SWAP,
        }) {
     if (task_state == TaskState::READY) {
       task_queues_[static_cast<int>(task_state)] = ready_queue_;
    } else {
       task_queues_[static_cast<int>(task_state)] = std::make_shared<TaskQueue>();
    }
  }
}
 // 我觉得名字起得不好,他的实际操作是获取所有就绪任务资源需求之和
 ResourceSet GetResourceLoad() const;
 /// 单个任务的增删查
 bool HasTask(const TaskID &task_id) const;
 const Task &GetTaskOfState(const TaskID &task_id, TaskState task_state) const;
 bool RemoveTask(const TaskID &task_id, Task *removed_task, TaskState *removed_task_state = nullptr);
 // 按状态获取任务,对于就绪状态,还需要按资源进行聚集
 const std::unordered_map<ResourceSet, ordered_set<TaskID>> &GetReadyTasksWithResources() const;
 const std::list<Task> &GetTasks(TaskState task_state) const;
 // 一组任务的移来移去
 std::vector<Task> RemoveTasks(std::unordered_set<TaskID> &task_ids);
 void QueueTasks(const std::vector<Task> &tasks, TaskState task_state);
 void MoveTasks(std::unordered_set<TaskID> &tasks, TaskState src_state, TaskState dst_state);
 void FilterState(std::unordered_set<TaskID> &task_ids, TaskState filter_state) const;
 /// 这两个函数是按其他维度:Actor 和 Job 来获取一组任务
 std::unordered_set<TaskID> GetTaskIdsForJob(const JobID &job_id) const;
 std::unordered_set<TaskID> GetTaskIdsForActor(const ActorID &actor_id) const;
 /// 阻塞任务和用户进程增删改查
 const std::unordered_set<TaskID> &GetBlockedTaskIds() const;
 const std::unordered_set<TaskID> &GetDriverTaskIds() const;
 void AddBlockedTaskId(const TaskID &task_id);
 void RemoveBlockedTaskId(const TaskID &task_id);
 void AddDriverTaskId(const TaskID &task_id);
 void RemoveDriverTaskId(const TaskID &task_id);
 /// 用来调试和监控
 std::string DebugString() const;
 void RecordMetrics() const;
 /// 这个好像没啥用,都没实现
 ResourceSet GetReadyQueueResources() const;
private:
 /// 一个辅助函数,由于调度队列算是有两层索引 task state -> (task id -> task),
 /// 因此经常需要定位到某个状态对应的任务队列,进而获取其中的某个任务。
 const std::shared_ptr<TaskQueue> &GetTaskQueue(TaskState task_state) const;
 /// 两个辅助函数,用来在指定状态的任务队列中删除或者过滤任务的
 void RemoveTasksFromQueue(ray::raylet::TaskState task_state,
                           std::unordered_set<ray::TaskID> &task_ids,
                           std::vector<ray::Task> *removed_tasks);
 void FilterStateFromQueue(std::unordered_set<ray::TaskID> &task_ids,
                           TaskState task_state) const;
 // kNumTaskQueues 作用便在此,所有int值在其之前的状态都有对应的任务队列
 std::array<std::shared_ptr<TaskQueue>, static_cast<int>(TaskState::kNumTaskQueues)>
     task_queues_;
 // 调度时候,就绪队列用的比较多,就单独维护一个指针在此
 const std::shared_ptr<ReadyQueue> ready_queue_;
 // 这两个状态(blocked 和 driver)没有对应的任务队列,只是用集合来保存id
 std::unordered_set<TaskID> blocked_task_ids_;
 std::unordered_set<TaskID> driver_task_ids_;
};

从上面代码我们可以看出以下几点:

  1. 所有函数基本是围绕单个任务或者一组任务的增删改查而来的。
  2. 所有任务实际上按二层索引组织 task state -> (task id -> task);因此定位到一个任务需要先经过 task state 这一层,于是造了辅助函数来进行这层操作:GetTaskQueue。此外,还有大量的在不同任务队列间倒来倒去的辅助函数。
  3. 上面所说的 kNumTaskQueues 是一个假状态,它本质上是一个界标。将其转换为整形后,所有小于它的状态都是按任务队列组织任务,所有大于它的状态只是用集合来存了任务ID(blocked 任务和 driver 任务)。
  4. 对于就绪队列,有一些特殊的照顾,因为实际将就绪任务安排到某个 worker 执行时很大的一块调度内容。这些额外照顾包括:a. 单独给就绪队列维护了一个指针、 b. 提供获取就绪队列资源需求之和接口、 c. 提供按同样资源需求聚集所有就绪任务接口。
  5. 还有两个按照其他维度获取一组资源的接口:GetTaskIdsForJobGetTaskIdsForActor 可以分别根据给定 JobId 和 ActorId 来获取一组任务。

名词释义

Task Required Resources:任务资源需求或者任务资源约束,通过在函数上添加注解 ray.remote(num_cpus=xx, num_gpus=xx)  来指定。其中 GPU 还可以指定小数个,以使多个任务共享一个 GPU。

Task argument:任务输入或者任务参数。如果翻译为输入是相对任务来说的,如果翻译为参数,是相对任务所执行的函数参数来说的。

Object:这里翻译为了数据对象。

Object Store:基于内存的不可变对象存储,是分散在各个节点的节点内、进程间的共享存储。

Node,Machine:指的是组成集群的每个机器。如果非要区分的话,Node可能更偏重逻辑上的节点,Machine 更偏重逻辑节点所在的物理机。但是在 Ray 中他们是一一对应的,即一个机器只有一个节点。


本篇就先到这里,下一篇计划写写调度策略或者资源定义。

相关文章
|
8天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
28 2
|
8天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
21天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
68 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
61 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
83 0
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)

推荐镜像

更多
下一篇
无影云桌面