PyTorch并行与分布式(二)分布式通信包torch.distributed

简介: PyTorch并行与分布式(二)分布式通信包torch.distributed

 本文主要参考资料为 distributed communication package torch.distributed

Backends

  torch.distributed支持三个内置Backends(后端),每个后端都有不同的功能。下表显示了哪些函数可用于CPU / CUDA tensors。只有PyTorch实现的情况下,MPI才会支持CUDA

Backends that come with PyTorch

  PyTorch distributed包支持Linux(stable),MacOS (stable)和Windows (prototype)。Linux中,默认情况GlooNCCL的后端在Pytorch安装的时候就生成了。NCCL在使用cuda build的时候才会有。MPI是一个可选的后端,只在pytorch build源码的过程中才会有。(比如在安装了MPI的主机上build pytorch)。

警告:从 PyTorch v1.7 开始,Windows 对分布式软件包的支持仅涵盖与 Gloo 后台、FileStore 和 DistributedDataParallel 的collective communications(聚合通信)。因此,init_process_group()中的 init_method 参数必须指向一个文件。

这个只适用于本地和共享文件系统:

  1. Local file system, init_method=“file:///d:/tmp/some_file”
  2. Shared file system, init_method=“file://{machine_name}/{share_folder_name}/some_file”
    如果你直接传入一个存储参数,它必须是一个FileStore实例。

使用哪个backend

  那我们应该使用哪个后端?。

  1. 经验法则
  • 使用NCCL后端进行分布式GPU训练。
  • 使用Gloo后端进行分布式CPU训练。
  1. 具有InfiniBand互连的GPU主机
  • 使用NCCL,因为它是目前唯一支持InfiniBand和GPUDirect的后端。
  1. GPU主机与以太网互连
  • 使用NCCL,因为它目前提供最佳的分布式GPU训练性能,特别是对于多进程单节点或多节点分布式训练。如果您遇到NCCL的任何问题,请使用Gloo作为后备选项。(请注意,Gloo目前运行速度比GPU的NCCL慢。)
  1. 具有InfiniBand互连的CPU主机
  • 如果您的InfiniBand在IB上已启用IP,请使用Gloo,否则请使用MPI。我们计划在即将发布的版本中为Gloo添加InfiniBand支持。
  1. 具有以太网互连的CPU主机
  • 除非您有特殊原因要使用MPI,否则请使用Gloo。

基础的环境变量

  选择要使用的网络接口

  默认情况下,NCCL和Gloo后端都会尝试查找用于通信的网络接口。但是,并不总能保证这一点。因此,如果在后端遇到任何问题而无法找到正确的网络接口。您可以尝试设置以下环境变量(每个变量适用于其各自的后端):

  • NCCL_SOCKET_IFNAME, 比如 export NCCL_SOCKET_IFNAME=eth0
  • GLOO_SOCKET_IFNAME, 比如 export GLOO_SOCKET_IFNAME=eth0

  如果使用Gloo后端,可以使用逗号来分隔他们。像export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将会以循环的方式来调度他们。所有进程必须在这个变量中指定相同数量的接口。

  其他NCCL环境变量

  NCCL还提供了许多用于微调目的的环境变量

  常用的包括以下用于调试目的:

  • export NCCL_DEBUG=INFO
  • export NCCL_DEBUG_SUBSYS=ALL

  有关NCCL环境变量的完整列表,请参阅官方文档

基础

  torch.distributed支持在一台或者多台机器上的,pytorch的多进程并行节点计算。torch.nn.parallel.DistributedDataParallel()基于此,提供一个为pytorch module的同步分布式训练封装。与torch.nn.DataParallel()不同的是,它是基于多进程封装的。其支持多台机器的网络互联,用户必须为每个进程明确启动一个单独的主训练脚本副本。

  在单台机器上, torch.distributedtorch.nn.parallel.DistributedDataParallel()封装仍然比数据并行的方法torch.nn.DataParallel()具有优势。

  • 每个进程都维护自己的优化器,并在每次迭代时执行完整的优化步骤。虽然这可能看起来是多余的,但由于梯度已经聚集在一起并且在整个过程中平均,因此对于每个过程都是相同的,这意味着不需要参数广播步骤,减少了在节点之间传输张量所花费的时间。
  • 每个进程都包含一个独立的Python解释器,消除了额外的解释器开销和来自单个Python进程驱动多个执行线程,模型副本或GPUGIL-thrashing。这对于大量使用Python运行时的模型尤其重要,包括具有循环层或许多小组件的模型。

初始化

  在使用这个包的其他方法之前需要调用torch.distributed.init_process_group()来初始化。 这将使得所有的需要的进程都被加入进来。

方法一

torch.distributed.is_available()

  如果distributed包能够被使用,这将返回True

方法二

torch.distributed.init_process_group(backend, 
        init_method=None, 
        timeout=datetime.timedelta(0, 1800), 
        world_size=-1, 
        rank=-1, 
        store=None, 
        group_name='')

  初始化默认的分布式进程组,也将会初始化分布式包。

  1. backend:多机通信后端,包括mpi, gloo, 和nccl。 如果在每台机器上使用nccl后台的多个进程,每个进程必须对它使用的每一个GPU都有独占的访问权限,因为进程之间共享GPU会导致死锁。
  2. init_method:可选参数,URL设置,如何初始化进程组。如果未指定init_methodstore,则默认为env://,表示使用读取环境变量的方式进行初始化。该参数与store互斥。
  3. timeout:可选参数,进程组超时限制。默认为30分钟。
  4. world_size:可选参数,参与作业的进程数。
  5. rank:可选参数,当前进程节点编号。
  6. store:可选参数,用于存储交换地址信息。
  7. group_name:可选参数,进程组名字。

  如果要启动backend == Backend.MPI的话,Pytorch需要从源码上build,并支持MPI

TCP初始化

  有两种方法可以使用TCP进行初始化,这两种方法都需要一个所有进程都可以访问的网络地址和所需的world_size。这种方式需要手动为每个进程指定进程号。

请注意,最新的分布式软件包中不再支持多播地址。group_name也被弃用了。

import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

说明

  不同进程内,均使用主进程的ip地址和port,确保每个进程能够通过一个master 进行协作。该 ip 一般为主进程所在的主机的 ip,端口号应该未被其他应用占用。

  实际使用时,在每个进程内运行代码,并需要为每一个进程手动指定一个rank,进程可以分布与相同或不同主机上。

  多个进程之间,同步进行。若其中一个出现问题,其他的也马上停止。

使用

  Node 1

python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 0 --world-size 2

  Node 2

python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 1 --world-size 2

底层实现

  在深入探讨初始化算法之前,先从C/C++层面,大致浏览一下 init_process_group背后发生了什么。

  1. 解析并验证参数
  2. 后端通过name2channel.at()函数进行解析,返回一个channel 类,将用于执行数据传输
  3. 丢弃GIL,并调用THDProcessGroupInit()函数,其实例化该 channel,并添加 master 节点的地址
  4. rank 0 对应的进程将会执行 master 过程,而其他的进程则作为 workers
  5. master
  6. 为所有的 worker创建 sockets
  7. 等待所有的worker连接
  8. 发送给他们所有其他进程的位置
  9. 每一个 worker
  10. 创建连接mastersockets
  11. 发送自己的位置信息
  12. 接受其他 workers 的信息
  13. 打开一个新的 socket,并与其他wokers进行握手信号
  14. 初始化结束,所有的进程之间相互连接

共享文件系统初始化

  另一种初始化方法就是利用文件系统,要求共享的文件对于组内所有进程可见。

import torch.distributed as dist
# rank should always be specified
dist.init_process_group(backend,
      init_method='file:///mnt/nfs/sharedfile',
      world_size=4, rank=args.rank)

说明

  其中,以file://为前缀,表示文件系统各式初始化。/mnt/nfs/sharedfile表示共享的文件,各个进程在共享文件系统中通过该文件进行同步或异步。因此,所有进程必须对该文件具有读写权限。

  每一个进程将会打开这个文件,写入自己的信息,并等待直到其他所有进程完成该操作。在此之后,所有的请求信息将会被所有的进程可访问,为了避免race conditions,文件系统必须支持通过 fcntl锁定(大多数的 local 系统和NFS均支持该特性)。

若指定为同一文件,则每次训练开始之前,该文件必须手动删除,但是文件所在路径必须存在!与 tcp 初始化方式一样,也需要为每一个进程手动指定 rank

使用

  在主机01上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2
• 1

  在主机02上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2
• 1

  这里相比于TCP的方式麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。

环境变量初始化

  该方法将从环境变量中读取配置,允许完全自定义信息的获取方式。需要设置的变量有:

  1. MASTER_PORT:必须指定,需为rank 0机器上的一个空闲端口。
  2. MASTER_ADDR:必须指定,主进程rank 0机器的地址。
  3. WORLD_SIZE:必须指定,总进程数,可以在这里设置也可以在之前的init函数中设置。
  4. RANK:当前进程的rank,也可以在init函数中指定。

  rank 0的机器需要建立所有的连接。

  • 使用实例:

  Node 1: (IP: 192.168.1.1, and has a free port: 1234)

>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

  Node 2

>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

分布式Key-Value Store

  distributed包自带一个分布式key-value存储,可以用来与不同的进程之间的通信,也可以在torch.distributed.init_process_group()中进行初始化。有三种Key-Value的存储方式:TCPStore, FileStore, 和HashStore

  更多的资料参考:https://pytorch.org/docs/stable/distributed.html

  这里没有写完,感兴趣的可以自己查看。

参考

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
7月前
|
机器学习/深度学习 分布式计算 数据处理
分布式计算框架:并行力量的交响乐章
分布式计算框架如Apache Spark解决单机计算挑战,通过拆分任务到多机并行处理提升效率。Spark以其内存计算加速处理,支持批处理、查询、流处理和机器学习。以下是一个PySpark统计日志中每日UV的示例,展示如何利用SparkContext、map和reduceByKey进行数据聚合分析。这些框架的运用,正改变大数据处理领域,推动数据分析和机器学习的边界。【6月更文挑战第18天】
263 2
|
2月前
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
69 6
|
2月前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
46 0
|
5月前
|
分布式计算 并行计算 大数据
NumPy 并行计算与分布式部署
【8月更文第30天】随着数据量的不断增长,传统的单机计算模型已经难以满足对大规模数据集处理的需求。并行和分布式计算成为了处理这些大数据集的关键技术。虽然 NumPy 本身并不直接支持并行计算,但可以通过结合其他库如 Numba 和 Dask 来实现高效的并行和分布式计算。
49 1
|
5月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
605 2
|
5月前
|
机器学习/深度学习 分布式计算 PyTorch
构建可扩展的深度学习系统:PyTorch 与分布式计算
【8月更文第29天】随着数据量和模型复杂度的增加,单个GPU或CPU已无法满足大规模深度学习模型的训练需求。分布式计算提供了一种解决方案,能够有效地利用多台机器上的多个GPU进行并行训练,显著加快训练速度。本文将探讨如何使用PyTorch框架实现深度学习模型的分布式训练,并通过一个具体的示例展示整个过程。
218 0
|
5月前
|
存储 异构计算
自研分布式训练框架EPL问题之通过strategy annotation实现流水并行如何解决
自研分布式训练框架EPL问题之通过strategy annotation实现流水并行如何解决
|
6月前
|
分布式计算 API 对象存储
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
1071 11
|
7月前
|
负载均衡 并行计算 Java
分布式系统中,利用并行和并发来提高整体的处理能力
分布式系统中,利用并行和并发来提高整体的处理能力
|
7月前
|
缓存 网络协议 Java
分布式系统详解--基础知识(通信)
分布式系统详解--基础知识(通信)
121 0