上一篇讲了待调度任务的组织形式,这一篇来继续挑软骨头啃:节点资源抽象和调度策略。
引子
由于 Ray 支持对任务进行显式的资源约束,因此需要对所有节点的资源进行硬件无关的抽象,将所有资源归一化管理,以在逻辑层面对资源进行增删。当有节点加入,需要感知其资源总量大小;当有任务调度,需要寻找满足约束节点;当任务调度成功,可以获取剩余可用资源等等。
Ray 除了对标准资源如 CPU,GPU 的支持,还支持对用户自定义 label 的资源的调度。用户在启动节点(ray start --resources <resources>
)指定该节点具有某种类别的资源(比如说 memory,bandwidth,某种型号的 GPU 等等)的总量,在定义 remote 函数时指定任务使用多少该类别的资源,Ray 的调度器在调度该任务时,就会按照用户自定义的资源需求将其调度到特定的机器上去。这是一种用户代码和调度器交互的一种有趣设计。
对于调度策略,由于 Ray 是去中心化的调度,很容易存在不一致状态。最简单的在实践中反而是统计最优的——对于每个任务找到符合资源约束的节点,随机选择一个,将任务调度过去。
调度资源抽象(SchedulingResources)
最基本的四个类是 FractionalResourceQuantity
、 ResourceSet
、 ResourceIds
和 ResourceIdSet
。各个类的特点概述一下:
FractionalResourceQuantity
定义了某种资源的量值ResourceSet
是一组不同种类资源及其量值的集合ResourceIds
对资源量按分数进行了标号——0, 1 … quantity-1 。ResourceIdSet
是一组标号后的资源的集合。
前两者是在多少层面上对资源进行描述,后两者是在索引层面对资源进行解构。
后两者是在前两者基础上的细化。他们都定义了单个量值和集合不同种类量值构成的集合量。
此外,很重要的一点是,在 FractionalResourceQuantity
名字中也有体现,Ray 支持小数量值,但是只支持纯小数量值。为什么会有这种设计呢?举个最简单的例子,GPU 很贵嘛,于是就想多个 Task 共用一个 GPU,以提高 GPU 的利用率。那么每个 Task 在定义 GPU 需求的时候,就可以写需要零点几个 GPU。在这种场景下,一点几个和二点几个的非纯小数值就没什么意思了,毕竟要么独占一个,要么与他人共享一个。
资源量值(FractionalResourceQuantity)
FractionalResourceQuantity
是对 double 的包装,表示 Ray 中对资源度量的量。但为了计算不损失精度,其内部实际实现为 64bit 的整型——实际值乘以 kResourceConversionFactor = 10000
取整。其目的很明显:
- 对于 Ray 的资源使用场景下,四五位小数左右的精度就够了
- 在这个精度内提供精确的运算
在此基础上重载了可度量的量的一些基本操作——加减运算和布尔运算。在 Ray 的场景下,只有节点加入(增加资源),判断是否可调度(比较资源)、调度任务(减小资源)等操作,因此乘除操作是不需要的。
当然也可以从另外一个角度来理解,或许更好理解一点,其内部表示将 量纲/单位 从逻辑的 1
,缩小为了 1/kResourceConversionFactor
。
class FractionalResourceQuantity { public: // 默认构造函数:resource_quantity_ = 0 FractionalResourceQuantity(); // 基本构造函数:指定资源量 FractionalResourceQuantity(double resource_quantity); /// 加减运算 const FractionalResourceQuantity operator+(const FractionalResourceQuantity &rhs) const; const FractionalResourceQuantity operator-(const FractionalResourceQuantity &rhs) const; void operator+=(const FractionalResourceQuantity &rhs); void operator-=(const FractionalResourceQuantity &rhs); /// 布尔运算 bool operator==(const FractionalResourceQuantity &rhs) const; bool operator!=(const FractionalResourceQuantity &rhs) const; bool operator<(const FractionalResourceQuantity &rhs) const; bool operator>(const FractionalResourceQuantity &rhs) const; bool operator<=(const FractionalResourceQuantity &rhs) const; bool operator>=(const FractionalResourceQuantity &rhs) const; /// 浮点型的实际值 double ToDouble() const; private: // 以 1/kResourceConversionFactor 为单位的资源量大小 int64_t resource_quantity_; };
这类运算实现的时候有个基本思想:尽量复用,即定义最小数量的正交操作,然后用这些操作来实现另外的操作。这样有两个好处:
- 代码简洁,因为复用了。
- 改动方便,将来如果要改变实现只需改变最基本的操作实现。
具体到本例子中的布尔操作集,首先定义等于和小于操作符作为基本操作集,然后以此实现其他几个操作符。后面 ResourceSet
中也有类似的思想:
// 两个基本操作 bool FractionalResourceQuantity::operator==(const FractionalResourceQuantity &rhs) const { return resource_quantity_ == rhs.resource_quantity_; } bool FractionalResourceQuantity::operator<(const FractionalResourceQuantity &rhs) const { return resource_quantity_ < rhs.resource_quantity_; } // 以下调用基本操作完成定义 bool FractionalResourceQuantity::operator!=(const FractionalResourceQuantity &rhs) const { return !(*this == rhs); } bool FractionalResourceQuantity::operator>(const FractionalResourceQuantity &rhs) const { return rhs < *this; } bool FractionalResourceQuantity::operator<=(const FractionalResourceQuantity &rhs) const { return !(*this > rhs); } bool FractionalResourceQuantity::operator>=(const FractionalResourceQuantity &rhs) const { return !(*this < rhs); }
资源集合(ResourceSet)
ResourceSet
是一系列不同种类的资源及其量值的集合,实现上是对字典(unordered_map
)包装。在物理意义上,一般用来表示一个节点的总资源量、已经使用的资源量、剩余可用的资源量等等。
基本操作包括对单个资源的增删,以及资源集合间的运算;详细见代码内注释。
class ResourceSet { public: ResourceSet(); // 三个构造函数: // 根据一个字典或者一个键值对列表构建 label(string)->amount(FractionalResourceQuantity) 的字典。 ResourceSet( const std::unordered_map<std::string, FractionalResourceQuantity> &resource_map); ResourceSet(const std::unordered_map<std::string, double> &resource_map); ResourceSet(const std::vector<std::string> &resource_labels, const std::vector<double> resource_capacity); // 析构函数 ~ResourceSet(); // 判断资源集合 A 是否为 B 的子集(A 中所有 label 的 amount 都不大于 B 中对应 label 的 amount) // 以该操作作为基本操作,可以实现接下来三个操作 bool IsSubset(const ResourceSet &other) const; // 下面函数中,前两个函数的实现一毛一样 bool operator==(const ResourceSet &rhs) const; bool IsEqual(const ResourceSet &other) const; bool IsSuperset(const ResourceSet &other) const; // 类似于字典的一些增删改查操作,即对某个种类的资源数量进行增删改查 void AddOrUpdateResource(const std::string &resource_name, const FractionalResourceQuantity &capacity); bool DeleteResource(const std::string &resource_name); FractionalResourceQuantity GetResource(const std::string &resource_name) const; bool IsEmpty() const; // 两个集合间的加减运算。需要注意的是,在增加的时候有时候不能超过一个上界:比如节点的资源总量大小,于是有了 // AddResourcesCapacityConstrained; 在减小的时候,资源量不能减小为负值,于是有了 // SubtractResourcesStrict。这是两个上下界保护的函数。 void AddResourcesCapacityConstrained(const ResourceSet &other, const ResourceSet &total_resources); void AddResources(const ResourceSet &other); void SubtractResources(const ResourceSet &other); void SubtractResourcesStrict(const ResourceSet &other); // 由于所有节点必然存在 CPU,所有任务调度时也必然需要 CPU 资源,因此单独拿出来作为一个函数。 // 其对应的 label name 为:kCPU_ResourceLabel = "CPU" const ResourceSet GetNumCpus() const; // 返回字典形式组织的资源列表和对应数量;一个是 double 形式表示,一个是 FractionalResourceQuantity // 形式表示 const std::unordered_map<std::string, double> GetResourceMap() const; const std::unordered_map<std::string, FractionalResourceQuantity> &GetResourceAmountMap() const; const std::string ToString() const; private: // 内部资源及其数量字典 std::unordered_map<std::string, FractionalResourceQuantity> resource_capacity_; };
集合间加减运算时,有两个额外的带上下界检查的函数。应该是为了避免小数不精确运算导致的后果?
资源标号(ResourceIds)
ResourceIds
解决的问题是为某种资源打上标号,并且以某种方式拆分资源。
对于资源标号,即给系统所有资源打上一个逻辑 ID(0~n-1)。比如说 GPU 0, GPU 1 等等。以使用户代码能够对资源进行定位,从而要求某段代码具体使用某个资源。
对于资源拆分,Ray 要求 API (ray.remote(label=amount))只能以两种形式使用资源:
- amount >= 1 且必须是整数。
- amount < 1,即是纯小数。
对应到物理意义上,即要么独占一到多个整份资源,要么和其他人共享单份资源。前者的经典例子是 CPU,后者经典例子是 GPU。
在内部实现上,ResourceIds
维护了两个列表。一个列表是整整型列表(vector<int64_t> whole_ids_
)代表所有的整数份资源的 ID 列表。一个列表是键值对列表(vector<pair<int64_t, FractionalResourceQuantity>>
),代表所有小数份资源 ID 及其对应的剩余份数。值得一提的是,对于一个节点,初始来说应该都是整数份资源(除非有某种特殊用途,比如不想让集群用满该节点资源啦)。然后随着需要小数份资源的任务的调度,一部分资源被切分,实现上表现为从整份资源列表中拿出一个资源,切分后,分出去一块给任务,剩下的放到小数份资源列表中。因此,两个列表中不会有相同的 ID,因为每个 ID 都最多对应一整份资源。如果由于任务完成,导致某些小数份资源释放,使得小数份资源列表中的具有同样 ID,这样的资源在还回时候会被合并,如果等于1之后,就会被拿到整数份资源列表中。
在资源分配的时候有些小原则。比如说要求小数份资源,我们优先去小数份资源列表里去找符合要求的,不能满足要求的话再去整数份资源列表中拆。
拆分的另一个问题是,我们不能将属于两个 ID 的两个小数份资源(比如说标号0 的有 0.5 份,标号 1 的有 0.5份)合到一块分配给一个要求较大的资源任务(比如说一个要求 0.75 份资源的任务)。举个例子来说,有两个 GPU 还剩一半用量,你不能将他们合起来分配给一个要求 0.75 份 GPU的任务。
还有一个变量decrement_backlog_用来记录所有超额资源请求。等待其他人Release了,会优先满足这些请求。
class ResourceIds { public: ResourceIds(); // 通过给定份数的资源构建 ResourceIds。resource_quantity 要么是个整数,要么是纯小数 explicit ResourceIds(double resource_quantity); // 在内部实现上,整数份的资源列表用 vector<int> 来表示所有对应的 ID,每个具有一整份资源。 // 对于小数份资源,使用一个 vector<pair> 来表示某个 ID 和其对应的资源量值 // 因此有以下三个构造函数。 explicit ResourceIds(const std::vector<int64_t> &whole_ids); explicit ResourceIds( const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids); ResourceIds( const std::vector<int64_t> &whole_ids, const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids); // 看是否有足够的要求的 resource_quantity 份资源。如果 resource_quantity 是整数,则只需看所有 // 整数份资源列表。需要注意的是如果 resource_quantity 是小数,那么必须要有单个 ID 的资源量大于 // resource_quantity 才可以(或者有不小于一份的整数资源,或者有大于resource_quantity的小数) // 而不能将两个小数凑在一块去大于 resource_quantity。因为分属于两个资源 ID 的量不能合到一块 bool Contains(const FractionalResourceQuantity &resource_quantity) const; // 根据上面的原则切下来一块资源 或者 回收一块资源。 // 分配资源时,适当地进行拆分 // 回收资源时,适当地进行合并。 ResourceIds Acquire(const FractionalResourceQuantity &resource_quantity); void Release(const ResourceIds &resource_ids); // 虽然语义不一样,但是和 Release 实现一样:将两个 resource_ids 加和 ResourceIds Plus(const ResourceIds &resource_ids) const; // 获取整数份/小数份资源 ID 列表 const std::vector<int64_t> &WholeIds() const; const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &FractionalIds() const; // 看是不是该ID集合中没有任何资源 bool TotalQuantityIsZero() const; // 所有资源加和以 FractionalResourceQuantity 形式返回 FractionalResourceQuantity TotalQuantity() const; std::string ToString() const; // 通过 IncreaseCapacity 和 DecreaseCapacity 更新到指定资源量;这个是为了满足用户对 // 对自定义资源动态调整而做的。 void UpdateCapacity(int64_t new_capacity); private: // 判断 resource_quantity 是不是一个整数 bool IsWhole(double resource_quantity) const; void IncreaseCapacity(int64_t increment_quantity); void DecreaseCapacity(int64_t decrement_quantity); // 两个列表 std::vector<int64_t> whole_ids_; std::vector<std::pair<int64_t, FractionalResourceQuantity>> fractional_ids_; // 追踪总量,总量即用 FractionalResourceQuantity 表示,也说明了该类是 FractionalResourceQuantity // 的细化 FractionalResourceQuantity total_capacity_; // 暂时性记下超额资源请求 int64_t decrement_backlog_; };
资源标号集合(ResourceIdSet)
ResourceIdSet 表示一组带标号的可用资源的集合。实现上用了一个字典unordered_map<string, ResourceIds> available_resources_ ,表示资源种类到其数量(标号过的)映射,并在其上定义了和 ResourceIds差不多的接口。
class ResourceIdSet { public: // 各种构造函数,就是构建 unordered_map ResourceIdSet(); ResourceIdSet(const ResourceSet &resource_set); ResourceIdSet(const std::unordered_map<std::string, ResourceIds> &available_resources); // 是否包含,索要和放回。和 ResourceIds 对应操作语义相同,只不过由单类资源变为了一组总资源。 bool Contains(const ResourceSet &resource_set) const; ResourceIdSet Acquire(const ResourceSet &resource_set); void Release(const ResourceIdSet &resource_id_set); void ReleaseConstrained(const ResourceIdSet &resource_id_set, const ResourceSet &resources_total); // 与 Release 实现一样 ResourceIdSet Plus(const ResourceIdSet &resource_id_set) const; /// 对于某类资源数量的增删查 // 给某类资源增加指定数量 void AddOrUpdateResource(const std::string &resource_name, int64_t capacity); // 删除某类资源 void DeleteResource(const std::string &resource_name); // 清空所有资源 void Clear(); // 获取所有可用资源,就是返回内部的散列表 const std::unordered_map<std::string, ResourceIds> &AvailableResources() const; // cpu 所有任务都得用,所以单独拿出来 ResourceIdSet GetCpuResources() const; // 将带标号的资源集合转变为只有数量描述的资源集合 ResourceSet ToResourceSet() const; // 打印和序列化 std::string ToString() const; std::vector<rpc::ResourceIdSetInfo> ToProtobuf() const; private: // 从资源种类到带标号的资源集合的映射 std::unordered_map<std::string, ResourceIds> available_resources_; };
调度资源类(SchedulingResource)
该类是最终对外负责的类,记录了某个节点上所有可供调度或者使用中的资源信息(resources_total_
),待使用的资源信息(resources_load_
)以及剩余可用的资源(resources_available_
)。上面三个字段皆为 ResourceIdSet
类型。
三者关系为:
resources_total_ = resouces_used_by_running_tasks + resources_available_
resources_load_ is part of resources_available_
第二个关系可能看起来比较奇怪,后面会详细讲。
class SchedulingResources { public: // 默认构造函数和指定总量的构造函数 SchedulingResources(); SchedulingResources(const ResourceSet &total); ~SchedulingResources(); // 总量,负载量和可用量的 getter const ResourceSet &GetAvailableResources() const; const ResourceSet &GetLoadResources() const; const ResourceSet &GetTotalResources() const; // 负载量和可用量的 setter,总量在构造时确定 void SetLoadResources(ResourceSet &&newset); void SetAvailableResources(ResourceSet &&newset); // 获取或者释放一组资源:在可用资源量上做增删 void Release(const ResourceSet &resources); void Acquire(const ResourceSet &resources); // 动态调整节点资源总量:更新某种种类的资源总量 or 删除某种种类资源 void UpdateResource(const std::string &resource_name, int64_t capacity); void DeleteResource(const std::string &resource_name); std::string DebugString() const; private: ResourceSet resources_total_; ResourceSet resources_available_; ResourceSet resources_load_; };
有意思的是,单从该源码来看, Release
和 Require
只对 resources_available_
进行了操作;而 resources_load_
只有整体 set 和 get 的操作,当然也可以通过 GetLoadResources
获取其引用后,直接对其进行加减。
结合其他源码思忖了一下,Ray 似乎想用 resources_load_
描述所有 SchedulingQueue::ready_queue_
需求总量,而非正在运行的任务的需求总量。正在运行的任务需求量应为 resources_total_ - resources_available_
。也就是说 resources_load_
是 resources_available_
的一部分,用来描述所有准备好的任务的资源需求总量。
作为一个典型的实现代表,贴一下 UpdateResource
的代码实现,该操作是对某类资源总量的更新;
void SchedulingResources::UpdateResource(const std::string &resource_name, int64_t capacity) { const FractionalResourceQuantity new_capacity = FractionalResourceQuantity(capacity); const FractionalResourceQuantity ¤t_capacity = resources_total_.GetResource(resource_name); if (current_capacity > 0) { // 如果该类资源存在,则将其总容量以及可用量进行相应更新 const FractionalResourceQuantity capacity_difference = new_capacity - current_capacity; const FractionalResourceQuantity ¤t_available_capacity = resources_available_.GetResource(resource_name); FractionalResourceQuantity new_available_capacity = current_available_capacity + capacity_difference; if (new_available_capacity < 0) { new_available_capacity = 0; } resources_total_.AddOrUpdateResource(resource_name, new_capacity); resources_available_.AddOrUpdateResource(resource_name, new_available_capacity); } else { // 如果不存在,则直接添加 resources_total_.AddOrUpdateResource(resource_name, new_capacity); resources_available_.AddOrUpdateResource(resource_name, new_capacity); } }
调度策略(SchedulingPolicy)
前面提到,Ray 使用去中心化的调度策略,即每个节点独立的对自己所看到的任务进行调度。SchedulingPolicy
就是描述单个节点的调度策略的,它通过构造函数拿到上一篇文章中提到的 SchedulingQueue
引用 ,从而拿到本节点所有的任务,然后通过 GCS 获取一组节点的资源概况(本节点的通过配置加载,对于其他节点,在感知到其加入集群的时候,从 GCS 中拉取),以 unordered_map<ClientID, SchedulingResources> &cluster_resources
表示。从而根据任务资源需求与节点资源存量的适配情况,进行调度决策。
此外,还有个 SpillOver
方法,其中 Schedule
方法是针对所有状态为 TaskState::PLACEABLE
的任务在一组节点中进行决策,所谓调度;SpillOver
方法是针对所有状态为 TaskState::INFEASIBLE
和 TaskState::READY
的任务在新加入的单个节点进行尝试,所谓挤出。只是后来随着本地资源也可以动态调整,也会在本地资源调整后使用此策略。
class SchedulingPolicy { public: // 构造函数:拿到本节点所有任务的引用 SchedulingPolicy(const SchedulingQueue &scheduling_queue); // 根据集群资源分布以及任务资源需求做调度决策,返回任务与其所调度到的节点的集合 std::unordered_map<TaskID, ClientID> Schedule( std::unordered_map<ClientID, SchedulingResources> &cluster_resources, const ClientID &local_client_id); // 在感知到新节点加入或者本地资源动态调整之后,对原先不可放置的任务进行尝试,并且匀出 // 至多一个 READY 的任务到新节点。 std::vector<TaskID> SpillOver(SchedulingResources &remote_scheduling_resources) const; virtual ~SchedulingPolicy(); private: // 任务队列的引用 const SchedulingQueue &scheduling_queue_; /// 一个随机种子 std::mt19937_64 gen_; };
Schedule
对于Schedule
函数,大概伪码如下:
for task in placeable_tasks_list: clients = find_all_available_resources_statisfied_clients() # available not include load if cliens.is_not_empty(): decision[task] = random_select_one(clients) else: clients = find_all_total_resources_statisfied_clients() # node whole resource if clients.is_not_empty(): decision[task] = random_select_one(clients) return decision
其中有两个值得注意的点:
- 对于每个任务,会按次序对所有节点筛选两遍。第一次针对每个节点的 真正可用 (
resources_available_ - resources_load_
)资源,第二次是针对节点所有资源(resources_total
)。 - 虽然注释里写着TODO:按权重进行节点选择。但是注释过去一年多了,现在代码中的策略仍然是对满足资源要求的节点集合随机选择一个节点,将任务调度过去。我猜其中有个可能的原因是在去中心化的调度决策下,一致性很难保证,随机选择反而能取得更好的性能。举个例子,如果按空闲资源量作为权重进行节点选择,如果某个节点加入了,那么剩余节点在调度的时候可能一哄而上的将任务调度到该节点上,造成新加入的很快过载,然后该节点再将过载的任务调度出去,从而来回拉风车式调度。
其中对于资源增删的操作稍稍复杂一些,贴在这里:
for (const auto &t : scheduling_queue_.GetTasks(TaskState::PLACEABLE)) { const auto &spec = t.GetTaskSpecification(); const auto &resource_demand = spec.GetRequiredPlacementResources(); const TaskID &task_id = spec.TaskId(); std::vector<ClientID> client_keys; for (const auto &client_resource_pair : cluster_resources) { ClientID node_client_id = client_resource_pair.first; const auto &node_resources = client_resource_pair.second; ResourceSet available_node_resources = ResourceSet(node_resources.GetAvailableResources()); // 1. 获取节点真正可用资源(resources_available_ - resources_load_)。 available_node_resources.SubtractResources(node_resources.GetLoadResources()); // 检测资源约束 if (resource_demand.IsSubset(available_node_resources)) { client_keys.push_back(node_client_id); } } if (!client_keys.empty()) { // 随机选一个 index std::uniform_int_distribution<int> distribution(0, client_keys.size() - 1); int client_key_index = distribution(gen_); const ClientID &dst_client_id = client_keys[client_key_index]; decision[task_id] = dst_client_id; // 2. 更新对应节点负载资源 ResourceSet new_load(cluster_resources[dst_client_id].GetLoadResources()); new_load.AddResources(resource_demand); cluster_resources[dst_client_id].SetLoadResources(std::move(new_load)); } ...
SpillOver
该函数比较简单,伪码如下:
def spill_over(remote_scheduling_resources): decision = [] new_load = ResourceSet() # 尝试原先不可放置的任务 for task in infeasible_task_list: if task.required_res.is_subset_of(remote_scheduling_resources): decision.append(task.id) new_load.add(task.required_res) # 匀出至多一个 ready 任务 for task in ready_task_list: if task.required_res.is_subset_of(remote_scheduling_resources): decision.append(task.id) new_load.add(task.required_res) break # 设置其节点资源负载 remote_scheduling_resources.set_load(new_load) return decision
该函数开始时应对的场景是,当感知到一个新节点上线时,会检测本机的某些任务能不能被调度过去。包括不可放置的任务(该节点上线前没有满足该任务资源需求的节点)和至多一个准备好的任务,我猜测这么干是为了弥补随机调度的不足,当一个新节点上线时,其他所有节点都将自己的任务匀给它一个(这个策略也比较有意思哈),以使得负载相对缓慢的从其他节点转移到新加入的节点。
后来随着版本迭代,节点静态资源变成动态资源。如果一个节点在启动时,通过配置加载其拥有的资源总量,此后维持不变,是为静态;如果在运行时资源总量仍然可设置,则为动态。在这种设计下,如果本节点资源总量被重新设置,那么也可能会调用此函数,对不可放置任务进行再尝试。至于匀任务这个操作,在此情景下,其实没什么意义。
最后,不要忘记的是,需要给被调度的节点设置资源负载,进行"占坑",以使得其他调度决策及时感知到到本次调度所带来的节点资源负载变化。
名词解释
- 逻辑和实现:逻辑表示类对外的抽象;实现表示类在内部的实际组织。
- resouce_label/resource_name: 或者说资源名称,标记某一种类的资源的标记,比如 GPU,CPU,Memory 等等
- ResourceId: 资源标号,给所有资源按照 0, 1, … , n-1 打上标记,以对某个资源进行索引。典型的如 GPU0, GPU1 ..
- 静态和动态资源:这是针对节点资源总量来说的,如果一个节点在启动时通过配置加载其拥有的资源总量,此后维持不变,是为静态;如果在运行时资源总量仍然可设置,则为动态