机器学习分布式框架Ray

简介: Ray是UC Berkeley RISELab推出的一个高性能分布式执行框架,它比Spark更具计算优势,部署简单,支持机器学习和深度学习的分布式训练。Ray包括节点(head和worker)、本地调度器、object store、全局调度器(GCS),用于处理各种分布式计算任务。它支持超参数调优(Ray Tune)、梯度下降(Ray SGD)、推理服务(Ray SERVE)等。安装简单,可通过`pip install ray`。使用时,利用`@ray.remote`装饰器将函数转换为分布式任务,通过`.remote`提交并用`ray.get`获取结果。5月更文挑战第15天

机器学习分布式框架Ray

1.什么是Ray

分布式计算框架大家一定都耳熟能详,诸如离线计算的Hadoop(map-reduce),spark, 流式计算的strom,Flink等。相对而言,这些计算框架都依赖于其他大数据组件,安装部署也相对复杂。

在python中,之前有分享过的Celery可以提供分布式的计算。今天和大家分享另外一个开源的分布式计算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch,tensorflow,keras等)

md-2021-09-07-17-46-27.png

2. Ray架构

Ray的架构参见最早发布的论文Ray: A Distributed Framework for Emerging AI Applications

md-2021-09-07-18-16-08.png

由上图可以Ray主要包括:

  • Node: 节点,主要是head和worker, head可以认为是Master,worker是执行任务的单元
    • 每个节点都有自己的本地调度器local scheduler
    • object store:一个内存对象存储,允许Node之间进行通信
  • scheduler: 有两个调度器,每个节点都有本地的调度器, 在提交任务时,Local Scheduler会判断是否需要提交给Global Scheduler分发给其他worker来执行。
  • GCS:全局状态控制记录了Ray中各种对象的状态信息,可以认为是meta数据,是Ray容错的保证

Ray适用于任何分布式计算的任务,包括分布式训练。笔者最近是用在大量的时间序列预测模型训练和在线预测上。

Ray目前库支持超参数调优Ray tune, 梯度下降Ray SGD,推理服务RaySERVE, 分布式数据Dataset以及分布式增强学习RLlib。还有其他第三方库,如下所示:

md-2021-09-07-19-12-34.png

3. 简单使用

3.1 安装部署

pip install --upgrade pip
# pip install ray
pip install ray == 1.6.0

# ImportError: cannot import name 'deep_mapping' from 'attr.validators'
# pip install attr == 19.1.0

3.2 单机使用

  • 简单例子
    Ray 通过@ray.remote装饰器使得函数变成可分布式调用的任务。通过函数名.remote方式进行提交任务,通过ray.get方式来获取任务返回值。单击情况下和多线程异步执行的方式类似。

      import time
      import ray
      ray.init(num_cpus = 4) # Specify this system has 4 CPUs.
    
      @ray.remote
      def do_some_work(x):
          time.sleep(1) # Replace this is with work you need to do.
          return x
    
      start = time.time()
      results = ray.get([do_some_work.remote(x) for x in range(4)])
      print("duration =", time.time() - start)
      print("results = ", results)
    
      # duration = 1.0107324123382568
      # results =  [0, 1, 2, 3]
    

    remote返回的对象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通过ray.get来获取实际的值, 需要注意的是ray.get是阻塞式的调用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]

  • 注意小任务使用情况
    需要注意的是ray分布式计算在调度的时候需要发费额外的时间,如调度,进程间通信以及任务状态的更新等等,所以避免过小的任务。可以把小任务进行合并

      @ray.remote
      def tiny_work(x):
          time.sleep(0.0001) # Replace this is with work you need to do.
          return x
    
      start = time.time()
      result_ids = [tiny_work.remote(x) for x in range(100000)]
      results = ray.get(result_ids)
      print("duration =", time.time() - start)
    
  • ray.put
    ray.put() 把一个对象放到对象存储上,返回一个object id, 这个id可以在分布式机器上都可以调用,该操作为异步的。通过ray.get()可以是获取。

      num = ray.put(10)
      ray.get(num)
    
  • ray.wait
    如果任务返回多个结果,ray.get()会等所有结果都完成之后才会执行后续的操作。如果多个结果执行的耗时不同,此时短板在于最长的那个任务。

    这个时候可以采用ray.wait()方法,ray.wait()返回执行完毕的和未执行完毕的任务结果,执行完成的结果可以继续后续的操作

      import random
      @ray.remote
      def do_some_work(x):
          time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
          return x
    
      def process_incremental(sum, result):
          time.sleep(1) # Replace this with some processing code.
          return sum + result
    
      start = time.time()
      result_ids = [do_some_work.remote(x) for x in range(4)]
      sum = 0
      while len(result_ids):
          done_id, result_ids = ray.wait(result_ids)
          sum = process_incremental(sum, ray.get(done_id[0]))
      print("duration =", time.time() - start, "\nresult = ", sum)
    
      # duration = 5.270821809768677 
      # result =  6
    

    md-2021-09-07-10-24-14.png

2.3 集群部署

Ray的架构遵循master-slave的模式。Head Node 可以认为是Master,其他的Node为worker。在集群部署时,Head Node需要首先启动ray start --head, 其他机器依次启动worker,注意需要指定head Node的地址确定关系,ray start --address 10.8.xx.3:6379

关闭服务,需要每一台机器执行 ray.stop

md-2021-09-07-11-17-49.png

# To start a head node.
#ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
ray start --head --node-ip-address 10.8.xx.3 --port=6379


# To start a non-head node.
# ray start --address=<address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path}

md-2021-09-07-13-37-40.png

  • 提交任务
    任何一台worker机器都可以提交任务, 先通过init连接Head Node就可以remote起来了。
      import ray
      ray.init(10.8.xx.3:6379)
    

3. 不同任务的例子

  • 任务依赖
    任务之间存在依赖关系,Ray和Spark一样也是通过生成DAG图的方式来确定依赖关系,确定可以并行跑的任务。如下图所示zeros是可以并行跑的。

    md-2021-09-07-10-23-42.png

``` python
import numpy as np
# Define two remote functions. Invocations of these functions create tasks
# that are executed remotely.

@ray.remote
def multiply(x, y):
    return np.dot(x, y)

@ray.remote
def zeros(size):
    return np.zeros(size)

# Start two tasks in parallel. These immediately return futures and the
# tasks are executed in the background.
x_id = zeros.remote((100, 100))
y_id = zeros.remote((100, 100))

# Start a third task. This will not be scheduled until the first two
# tasks have completed.
z_id = multiply.remote(x_id, y_id)

# Get the result. This will block until the third task completes.
z = ray.get(z_id)
print(z)
```
  • 有状态任务
    上面提到的任务都是无状态的(除依赖外),即任务之间都是无关系的。Ray也是支持有状态的任务成为Actor。常是在python class上加@ray.remote,ray会跟踪每个class内部状态的不同状态。

      @ray.remote
      class Counter(object):
          def __init__(self):
              self.n = 0
    
          def increment(self):
              self.n += 1
    
          def read(self):
              return self.n
    
      counters = [Counter.remote() for i in range(4)]
    
      # 不断的执行可以每个counter计数不断增加
      [c.increment.remote() for c in counters]
      futures = [c.read.remote() for c in counters]
      print(ray.get(futures))
      # [1, 1, 1, 1]
      # [11, 11, 11, 11]
    
  • map-reduce 任务
    map-reduce任务其实可以其他分布式任务是一样的。主要是各种聚合操作。Map-Reduce常规操作如下
    md-2021-09-07-13-55-07.png

  - word count例子见:https://github.com/ray-project/ray/blob/master/doc/examples/streaming/streaming.py

这里举一个简单的例子:
``` python
@ray.remote
def map(obj, f):
    return f(obj)
@ray.remote
def sum_results(*elements):
    return np.sum(elements)

items = list(range(100))
map_func = lambda i : i*2
remote_elements = [map.remote(i, map_func) for i in items]

# simple reduce
remote_final_sum = sum_results.remote(*remote_elements)
result = ray.get(remote_final_sum)

# tree reduce
intermediate_results = [sum_results.remote(
    *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]
remote_final_sum = sum_results.remote(*intermediate_results)
result = ray.get(remote_final_sum)

```
![md-2021-09-07-19-34-45.png](https://ucc.alicdn.com/pic/developer-ecology/abb7gqinvjggw_0b8ad35cdc774ab78d681c2c782ff884.png)
  • 训练模型如pytorch
    官网提供了Best Practices: Ray with PyTorch, 主要是下载训练/测试数据和训练多个模型(感觉不是很实用)。训练多个模型,可以进行参数融合。

    参见 https://docs.ray.io/en/latest/using-ray-with-pytorch.html

4. 总结

本文分享了高效的Python分布式计算框架Ray,希望对你有帮助。总结如下:

  • Ray是UC Berkeley RISELab新推出的高性能分布式执行框架, Spark也是伯克利出品的
  • Ray架构关键:两个调度器, Head和worker节点,GCS全局状态控制保证计算容错
  • Ray应用简单:@ray.remote把任务变成分布式任务, x.remote提交任务, get/wait获取结果
  • 集群不是:ray start
  • Ray支持多种任务:有依赖DAG,有状态Actor以及深度学习支持
  • 不断丰富的库:RaySERVE, RaySGD, RayTune, Ray data,rllib
相关实践学习
在云上部署ChatGLM2-6B大模型(GPU版)
ChatGLM2-6B是由智谱AI及清华KEG实验室于2023年6月发布的中英双语对话开源大模型。通过本实验,可以学习如何配置AIGC开发环境,如何部署ChatGLM2-6B大模型。
目录
相关文章
|
4月前
|
数据采集 自动驾驶 Java
PAI-TurboX:面向自动驾驶的训练推理加速框架
PAI-TurboX 为自动驾驶场景中的复杂数据预处理、离线大规模模型训练和实时智能驾驶推理,提供了全方位的加速解决方案。PAI-Notebook Gallery 提供PAI-TurboX 一键启动的 Notebook 最佳实践
|
7月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
619 0
分布式爬虫框架Scrapy-Redis实战指南
|
6月前
|
机器学习/深度学习 数据可视化 算法
Ray Flow Insight:让分布式系统调试不再"黑盒"
作为Ray社区的积极贡献者,我们希望将这些实践中沉淀的技术能力回馈给社区,推动Ray生态在实际场景中的应用深度和广度。因此,2024年底我们做了激活AntRay开源社区的决策,AntRay会始终保持与官方Ray版本强同步(即AntRay会紧随Ray官方社区版本而发布),内部Feature亦会加速推向AntRay以开源研发模式及时反哺内部业务,同时会将社区关注的Feature提交至Ray官方社区,实现内外部引擎双向价值流动。后续我们会以系列文章形式同步蚂蚁推向开源的新特性,本文将重点介绍:Ray Flow Insight —— 让分布式系统调试不再"黑盒"。
|
机器学习/深度学习 人工智能 算法
Post-Training on PAI (3):PAI-ChatLearn,PAI 自研高性能强化学习框架
人工智能平台 PAI 推出了高性能一体化强化学习框架 PAI-Chatlearn,从框架层面解决强化学习在计算性能和易用性方面的挑战。
|
4月前
|
机器学习/深度学习 人工智能 分布式计算
Post-Training on PAI (1):一文览尽开源强化学习框架在PAI平台的应用
Post-Training(即模型后训练)作为大模型落地的重要一环,能显著优化模型性能,适配特定领域需求。相比于 Pre-Training(即模型预训练),Post-Training 阶段对计算资源和数据资源需求更小,更易迭代,因此备受推崇。近期,我们将体系化地分享基于阿里云人工智能平台 PAI 在强化学习、模型蒸馏、数据预处理、SFT等方向的技术实践,旨在清晰地展现 PAI 在 Post-Training 各个环节的产品能力和使用方法,欢迎大家随时交流探讨。
|
5月前
|
机器学习/深度学习 人工智能 算法
PaperCoder:一种利用大型语言模型自动生成机器学习论文代码的框架
PaperCoder是一种基于多智能体LLM框架的工具,可自动将机器学习研究论文转化为代码库。它通过规划、分析和生成三个阶段,系统性地实现从论文到代码的转化,解决当前研究中代码缺失导致的可复现性问题。实验表明,PaperCoder在自动生成高质量代码方面显著优于基线方法,并获得专家高度认可。这一工具降低了验证研究成果的门槛,推动科研透明与高效。
373 19
PaperCoder:一种利用大型语言模型自动生成机器学习论文代码的框架
|
5月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
494 4
|
6月前
|
机器学习/深度学习 算法 数据挖掘
PyTabKit:比sklearn更强大的表格数据机器学习框架
PyTabKit是一个专为表格数据设计的新兴机器学习框架,集成了RealMLP等先进深度学习技术与优化的GBDT超参数配置。相比传统Scikit-Learn,PyTabKit通过元级调优的默认参数设置,在无需复杂超参调整的情况下,显著提升中大型数据集的性能表现。其简化API设计、高效训练速度和多模型集成能力,使其成为企业决策与竞赛建模的理想工具。
179 12
PyTabKit:比sklearn更强大的表格数据机器学习框架
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
阿里云人工智能平台 PAI 开源 EasyDistill 框架助力大语言模型轻松瘦身
本文介绍了阿里云人工智能平台 PAI 推出的开源工具包 EasyDistill。随着大语言模型的复杂性和规模增长,它们面临计算需求和训练成本的障碍。知识蒸馏旨在不显著降低性能的前提下,将大模型转化为更小、更高效的版本以降低训练和推理成本。EasyDistill 框架简化了知识蒸馏过程,其具备多种功能模块,包括数据合成、基础和进阶蒸馏训练。通过数据合成,丰富训练集的多样性;基础和进阶蒸馏训练则涵盖黑盒和白盒知识转移策略、强化学习及偏好优化,从而提升小模型的性能。
|
7月前
|
人工智能 自然语言处理 算法
MT-MegatronLM:国产训练框架逆袭!三合一并行+FP8黑科技,大模型训练效率暴涨200%
MT-MegatronLM 是摩尔线程推出的面向全功能 GPU 的开源混合并行训练框架,支持多种模型架构和高效混合并行训练,显著提升 GPU 集群的算力利用率。
463 18

热门文章

最新文章