【阅读原文】戳:KubeRay on ACK:更高效、更安全
ML Infra 是决定机器学习工作负载能否生产落地的关键一环。生产环境中的 ML Infra 需要面对各种工作负载,例如数据处理、模型训练、模型推理等,数据处理可以包括结构化数据与非结构化数据,模型训练可以包括预训练,微调。不同的工作负载对 ML Infra 具有不同的需求,例如数据处理对于 CPU 的需求更多,模型训练则更多使用 GPU,分布式训练或分布式推理还需要 Infra 提供 Gang 调度的能力。因此,一个灵活的 ML Infra 对于机器学习工作负载在生产环境中落地至关重要。阿里云容器服务 Kuberneters 版 [1](简称 ACK)(国际站链接 [2])以托管组件化的方式给客户提供快速搭建 Ray 集群的能力,并通过结合使用阿里云的调度,存储,日志与监控,给用户提供更佳使用体验。
1. Ray
Ray 诞生于 UC Berkeley RISELab(该实验室也是 Spark 的起源地),是一个分布式计算框架,提供非常灵活的 API,便于用户快速开发各类 AI 应用。用户能够直接将 Python 脚本提交到集群中,由 Ray 进行自动分布式执行。Ray 的架构可以分为三层:计算引擎 Ray Core,Ray AI Lib 以及 Ray 的部署方式。
1.1 Ray Core
Ray Core 是 Ray 的核心 API,类似于 Spark 中的 Spark Core 或者 Hadoop 框架中的 MapReduce。函数,类以及变量是本地 Python 编程中的核心要素,Ray Core 则提供 Task,Actor 以及 Object 与之一一对应。例如,通过 @ray.remote 装饰器即可将一个普通的 Python 函数或 Python 类转换成一个可以远程执行的函数。
1.2 Ray AI Lib
Ray 生态中包含了 Ray Data、Ray Train、Ray Tune、Ray Serve 等 AI 库,这些库底层包装了 Ray Core 允许客户高效利用 Ray 集群的分布式执行特性。
Ray 提供了 Ray Data 的代码库,支持 Map,Filter,增删减列等数据操作,允许数据科学家将数据处理过程定义为一个有向无环图并进行惰性分布式计算。Ray 在执行这些数据操作时会根据数据集的大小自动进行分片来提升处理效率。同时 Ray Data 支持 CSV,Parquet,Pandas,数据库等多种数据源,非常适合进行进行数据处理。
下面是一个简单的使用 Ray Data 进行数据处理的例子。例子中通过 read_csv 从 s3 中读取数据,通过 dataset 的 map 运算对数据并行处理,最后将处理过的数据写回文件。
import ray # Load a CSV dataset directly from S3 ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") # Preview the first record ds.show(limit=1) from typing import Dict import numpy as np # Define a transformation to compute a "petal area" attribute def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: vec_a = batch["petal length (cm)"] vec_b = batch["petal width (cm)"] batch["petal area (cm^2)"] = vec_a * vec_b return batch # Apply the transformation to our dataset transformed_ds = ds.map_batches(transform_batch) # View the updated schema with the new column # .materialize() will execute all the lazy transformations and # materialize the dataset into object store memory print(transformed_ds.materialize()) # Extract the first 3 rows as a batch for processing print(transformed_ds.take_batch(batch_size=3)) import os # Save the transformed dataset as Parquet files transformed_ds.write_parquet("/tmp/iris") # Verify the files were created print(os.listdir("/tmp/iris"))
Ray Serve 是一个可扩展的模型服务库,用于构建在线推理 API。Serve 与框架无关,用户可以使用一个统一的工具包来部署各种模型,从使用 PyTorch、TensorFlow 和 Keras 等框架构建的深度学习模型,到 Scikit-Learn 模型,再到任意的 Python 业务逻辑。它还为服务大型语言模型(LLM)提供了多种特性和性能优化,例如响应流式传输、动态请求批处理、多节点/多 GPU 服务等。Ray Serve 基于 Ray 构建,可以轻松扩展到多台机器,并提供灵活的调度支持,例如支持按比例分配 GPU 资源,从而实现资源共享,并以较低成本服务大量机器学习模型。
下面是一个 Ray Serve 的简单示例,示例将会再 Ray 集群中部署一个两副本的翻译模型,相比于不使用 RayServe,只需要在 Translator 类上增加一个 serve.deployment 的装饰器,并增加一个 __call__ 的成员函数,AI 开发者只需要很小的改动即可让现有的代码接入到 Ray 集群中。
# File name: serve_quickstart.py from starlette.requests import Request import ray from ray import serve from transformers import pipeline @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0}) class Translator: def __init__(self): # Load model self.model = pipeline("translation_en_to_fr", model="t5-small") def translate(self, text: str) -> str: # Run inference model_output = self.model(text) # Post-process output to return only the translation text translation = model_output[0]["translation_text"] return translation async def __call__(self, http_request: Request) -> str: english_text: str = await http_request.json() return self.translate(english_text) translator_app = Translator.bind()
除了 Ray Data 以及 Ray Serve 以外,Ray Train 能够帮助用户减少模型训练代码的复杂度,Ray Tune 能够帮助用户减少模型微调复杂度。这些库共同构建了一个覆盖机器学习全生命周期的自动化分布式执行框架。
1.3 Ray 的部署
Ray 支持以 VM 或者 Kubernetes 作为基础设施,在 VM 上,用户可以通过安装 Ray 并执行 ray start 的方式将节点加入 Ray 集群。而在 Kubernetes 上则需要通过 Kuberay 管理 Ray 集群。
2. KubeRay
KubeRay 是 Ray 在 Kubernetes 上进行部署和使用的解决方案,KubeRay 负责管理 Ray 集群以及关联的应用在 Kubernetes 上的生命周期。通过 KubeRay,客户可以轻松在一个已有的 Kubernetes 集群中部署一个 Ray 集群,或者提交一个 Ray 任务,亦或者是部署一个 Ray 编写的 Serve 应用。
KubeRay 为这三种用法分别提供了 RayCluster、RayJob 以及 RayServe 三种抽象。
RayCluster 代表一个在 Kubernetes 中运行的常驻的 Ray 集群,由 KubeRay 管理其 Head 节点、Worker 节点的创建以及自动扩缩容,用户可以通过包括 Head 节点端口的方式对外提供一个完整的 Ray 集群服务。
RayJob 代表一个需要在 Ray 集群中运行的任务,可以选择由一个已经在 Kubernetes 运行的 RayCluster 运行也可以选择由一个临时创建的 RayCluster 运行。KubeRay 负责创建该临时运行的 RayCluster,提交任务,并同步任务状态,在任务运行完成后进行 RayCluster 的回收。
RayServe 代表一个需要在 Ray 集群中运行的应用,同样可以选择由一个已经在 Kubernetes 运行的 RayCluster 运行也可以选择由一个临时创建的 RayCluster 运行。KubeRay 负责创建 RayCluster 并向其中部署该应用。通过将在 Kubernetes 中部署 RayServe,应用可以通过 Kubernetes 的 Service,Ingress 等能力实现高可用,无损升级等特性。
下面是一个在 Kubernetes 集群中创建一个 Ray 集群的命令,这条命令会创建一个带有一个 Head 节点和一个初始 Worker 节点的 Ray 集群,并打开 Ray 的 autoscaler 能力。集群运行完成后可以通过 Pod 的 8265 端口查看 Dashboard 以及管理集群任务。
cat <<EOF | kubectl apply -f - apiVersion: ray.io/v1 kind: RayCluster metadata: name: myfirst-ray-cluster namespace: default spec: suspend: false autoscalerOptions: env: [] envFrom: [] idleTimeoutSeconds: 60 imagePullPolicy: Always resources: limits: cpu: 2000m memory: 2024Mi requests: cpu: 2000m memory: 2024Mi securityContext: {} upscalingMode: Default enableInTreeAutoscaling: false headGroupSpec: rayStartParams: dashboard-host: 0.0.0.0 num-cpus: "0" serviceType: ClusterIP template: spec: containers: - image: rayproject/ray:2.36.1 imagePullPolicy: Always name: ray-head resources: limits: cpu: "4" memory: 4G requests: cpu: "1" memory: 1G workerGroupSpecs: - groupName: work1 maxReplicas: 1000 minReplicas: 0 numOfHosts: 1 rayStartParams: {} replicas: 1 template: spec: containers: - image: rayproject/ray:2.36.1 imagePullPolicy: Always name: ray-worker resources: limits: cpu: "4" memory: 4G requests: cpu: "4" memory: 4G EOF
KubeRay 将 Ray 框架与 Kubernetes 的进行了无缝对接,继承了 Kubernetes 的自动化基础设施,丰富生态,高可用,高可靠,高扩展性,丰富的设备管理机制以及多云迁移能力。
3. Ray on ACK
ACK 以托管组件的方式支持 KubeRay,托管 KubeRay 有以下优势:
安全性增强:ACK 托管的 KubeRay Operator 组件基础镜像、安全配置等经过加固,能够保证组件受攻击面最小化,同时保证组件不包含高位漏洞风险。
免运维:ACK 为托管的 KubeRay Operator 配置了自动 VPA,KubeRay Operator 组件的可用资源会在实际使用资源即将超出时自动扩容来保障业务持续可用。且 ACK 为用户省去了升级组件,修复组件 Bug 的烦恼。用户可以专注于业务开发。
高可用部署:ACK 保证 KubeRay Operator 分布在至少两个可用区,保证客户业务在遇到可用区级故障的情况下依然可用。
可观测性:ACK 集群自动对控制面 KubeRay Operator 的日志进行收集,用户可以通过控制面组件日志查看 KubeRay Operator 日志确认 Operator 的运行状态。
ACK 托管的 KubeRay Operator 的组件架构如下图所示:
ACK 集群支持通过节点池管理节点,可以通过节点池使用多种不同算力类型,包括智算场景下常用的灵骏算力,按量或包年包月的 ECS 算力,以及按秒计费,支持灵活伸缩弹性的 ACS 算力。确保不同客户的业务需求都可以在 ACK 集群上找到合适的算力类型。
本节中我们以一个实际的客户案例展示在 ACK 上使用 KubeRay 时的附加能力。该客户在整体使用 Ray 时主要遇到以下几个问题:
多个 RayJob 之间无法限制 Quota 的使用,也无法设定任务执行优先级,客户的高优任务无法保证高优运行。
客户集群会包含 RayServe 以及 RayJob 等多种负载类型,RayJob 是定时触发的,低频次的,但是需要大量资源,RayServe 是存在流量突发的,平时需要的资源量少,为了满足峰值资源,需要在集群中预留大量机器,对于客户成本带来较大压力。
客户采用每个 RayJob 都新建 RayCluster 运行的模式,每个 RayJob 结束之后就无法查看 Ray Dashboard。
最终客户通过组合使用队列,调度,HistoryServer 等能力实现了 Ray 业务的生产落地,并且获得了超过 40% 的云上成本节省。
4. 高效的调度策略
为了高效的使用多样化的算力,高级的调度策略必不可少。阿里云为客户提供了多种不同算力,这些算力的价格,可靠性,库存,性能以及计费模式均有不同,客户可以通过灵活组合使用不同种类的算力实现整体成本的最优配置。阿里云上通过 ResourcePolicy 帮助客户配置不同类型算力的使用,ResourcePolicy 为不同算力配置调度顺序,帮助客户更好的节约算力成本。
ResourcePolicy 可以配置当某种类型的算力耗尽,或者某种类型的算力上运行的 Pod 数量达到一定数量时开启使用下一级算力。进一步的还可以设置在某一级算力耗尽时尝试等待该类型算力释放资源,进一步提升已有资源的利用效率。以下是一个使用 ResourcePolicy 的最简单的示例,示例中设置了在包年包月的 ECS 实例资源不足的情况下使用按量付费的 ACS 实例,非常适合与 HPA 一起使用应对流量突发场景。
apiVersion: scheduling.alibabacloud.com/v1alpha1 kind: ResourcePolicy metadata: name: resourcepolicy-example namespace: default spec: selector: key1: value1 units: - resource: ecs - resource: eci
客户存在定时运行的以 RayJob 为主的数据处理任务以及长时间运行的以 RayServe 部署的机器学习推理业务。客户配置了 RayJob 业务优先使用包年包月的 ECS 算力,ECS 算力不够时使用 ACS 算力,RayServer 只是用包年包月的 ECS。配置包年包月的 ECS 算力满足 RayServe 的常驻需求,并预留一部分冗余确保 RayServe 创建的 RayCluster 扩容时可以快速启动。RayJob 定时启动时可以优先使用剩余空闲的 ECS 资源,并在 ECS 资源不足时补充使用 ACS 算力。ACS 算力是阿里云提供的 Serverless 算力类型,具有秒级弹性,按秒计费的特性,这样灵活的计费模式更适合运行定时数据处理任务这类短时间低频次的任务。整体配置模式如下图所示。
5. 配额及队列支持
企业在管理集群资源时面临的主要挑战是任务量庞大而资源有限。为解决这一问题,需要优先将资源分配给关键部门或个人,并保持高度的灵活性以随时调整资源分配。阿里云 ACK 通过 Kube Queue 向客户提供任务队列能力。通过配额以及队列,用户可以定义业务以及团队的资源分配保障额度及资源分配上限。
任务队列 Queue 具备将来自不同部门和团队的 RayJob 分配至相应队列的能力,在 ElasticQuotaTree 提交后,ack-kube-queue 会在集群中自动创建相应的 Queue 用于任务排队。每个叶子节点的资源配额对应于集群中的一个独立 Queue。当 RayJob 被提交到集群时,ack-kube-queue 会根据 RayJob 所在命名空间自动关联对应 Queue,自动将任务分配至对应的队列内,根据排队策略或者 Quota 来决定是否出队。
客户在集群中创建了如下图所示的配额树,视频团队 video 关联了 video 的 namespace,通过 min 和 max 配置资源配额,Kube Queue 会自动为此配额创建关联的 queue:root-algorithm-video。后续在 videonamespace 下的 RayJob(.spec.suspend 字段设为 True)提交后,会自动创建对应的 QueueUnit 资源对象,进入 root-algorithm-video 队列进行排队。每个队列当中的任务会按照 RayJob 中 Head Pod 的优先级以及任务的提交时间进行优先排队,从而保障有资源时优先供给给高优任务执行。
6. 阿里云监控集成
ACK 集群默认集成安装 Prometheus 能力,用户仅需提交 Pod Monitor 以及 Service Monitor 即可实现对 ACK 集群数据面的 RayCluster 的监控数据进行采集,并且可以通过统一的 Prometheus 界面查看所有 RayCluster 监控。
7. 阿里云日志集成
用户也可以通过添加标签的方式收集数据面的 RayCluster 的日志到阿里云的 SLS 中,相比于自建日志存储,阿里云的 SLS 支持对日志进行结构化的查询及分析,免去自建与维护成本,总体拥有成本(TCO)低于自建 50%,且提供 99.99% 的可用性 SLA 指标,具有更高的可靠性。
8. Ray HistoryServer
Ray 原生 Dashboard 仅在 Ray 集群运行时可用,集群终止后用户无法获取历史日志与监控数据。为解决此问题,ACK 提供了 Ray 集群的 HistoryServer。HistoryServer 能够提供对于当前正在运行的以及过往已经结束的 RayCluster 的 Dashboard 的访问。其中 HistoryServer 的 Dashboard 与 Ray 自身的 DashBoard 有一致的能力, Metric 监控自动对接阿里云 Arms 监控,无需客户自己搭建 promethus 和 granfa,如下图所示。
相关链接:
[1] 阿里云 ACK
[2] 国际站链接
我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。
获取关于我们的更多信息~