本文主要参考资料为 distributed communication package torch.distributed
Backends
torch.distributed
支持三个内置Backends
(后端),每个后端都有不同的功能。下表显示了哪些函数可用于CPU / CUDA tensors
。只有PyTorch
实现的情况下,MPI
才会支持CUDA
。
Backends that come with PyTorch
PyTorch distributed
包支持Linux
(stable),MacOS
(stable)和Windows
(prototype)。Linux
中,默认情况Gloo
和NCCL
的后端在Pytorch安装的时候就生成了。NCCL在使用cuda build的时候才会有。MPI是一个可选的后端,只在pytorch build源码的过程中才会有。(比如在安装了MPI的主机上build pytorch)。
警告:从 PyTorch v1.7 开始,Windows 对分布式软件包的支持仅涵盖与 Gloo 后台、FileStore 和 DistributedDataParallel 的collective communications(聚合通信)。因此,init_process_group()中的 init_method 参数必须指向一个文件。
这个只适用于本地和共享文件系统:
Local file system
, init_method=“file:///d:/tmp/some_file”Shared file system
, init_method=“file://{machine_name}/{share_folder_name}/some_file”
如果你直接传入一个存储参数,它必须是一个FileStore实例。
使用哪个backend
那我们应该使用哪个后端?。
- 经验法则
- 使用
NCCL
后端进行分布式GPU
训练。 - 使用
Gloo
后端进行分布式CPU
训练。
- 具有InfiniBand互连的GPU主机
- 使用NCCL,因为它是目前唯一支持InfiniBand和GPUDirect的后端。
- GPU主机与以太网互连
- 使用
NCCL
,因为它目前提供最佳的分布式GPU
训练性能,特别是对于多进程单节点或多节点分布式训练。如果您遇到NCCL
的任何问题,请使用Gloo
作为后备选项。(请注意,Gloo目前运行速度比GPU的NCCL慢。)
- 具有InfiniBand互连的CPU主机
- 如果您的InfiniBand在IB上已启用IP,请使用Gloo,否则请使用MPI。我们计划在即将发布的版本中为Gloo添加InfiniBand支持。
- 具有以太网互连的CPU主机
- 除非您有特殊原因要使用MPI,否则请使用Gloo。
基础的环境变量
选择要使用的网络接口
默认情况下,NCCL和Gloo后端都会尝试查找用于通信的网络接口。但是,并不总能保证这一点。因此,如果在后端遇到任何问题而无法找到正确的网络接口。您可以尝试设置以下环境变量(每个变量适用于其各自的后端):
- NCCL_SOCKET_IFNAME, 比如 export
NCCL_SOCKET_IFNAME=eth0
- GLOO_SOCKET_IFNAME, 比如 export
GLOO_SOCKET_IFNAME=eth0
如果使用Gloo后端,可以使用逗号来分隔他们。像export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。后端将会以循环的方式来调度他们。所有进程必须在这个变量中指定相同数量的接口。
其他NCCL环境变量
NCCL还提供了许多用于微调目的的环境变量
常用的包括以下用于调试目的:
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL
有关NCCL环境变量的完整列表,请参阅官方文档
基础
torch.distributed
支持在一台或者多台机器上的,pytorch的多进程并行节点计算。torch.nn.parallel.DistributedDataParallel()
基于此,提供一个为pytorch module
的同步分布式训练封装。与torch.nn.DataParallel()
不同的是,它是基于多进程封装的。其支持多台机器的网络互联,用户必须为每个进程明确启动一个单独的主训练脚本副本。
在单台机器上, torch.distributed
或torch.nn.parallel.DistributedDataParallel()
封装仍然比数据并行的方法torch.nn.DataParallel()
具有优势。
- 每个进程都维护自己的优化器,并在每次迭代时执行完整的优化步骤。虽然这可能看起来是多余的,但由于梯度已经聚集在一起并且在整个过程中平均,因此对于每个过程都是相同的,这意味着不需要参数广播步骤,减少了在节点之间传输张量所花费的时间。
- 每个进程都包含一个独立的
Python
解释器,消除了额外的解释器开销和来自单个Python
进程驱动多个执行线程,模型副本或GPU
的GIL-thrashing
。这对于大量使用Python运行时的模型尤其重要,包括具有循环层或许多小组件的模型。
初始化
在使用这个包的其他方法之前需要调用torch.distributed.init_process_group()
来初始化。 这将使得所有的需要的进程都被加入进来。
方法一:
torch.distributed.is_available()
如果distributed
包能够被使用,这将返回True
。
方法二:
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')
初始化默认的分布式进程组,也将会初始化分布式包。
backend
:多机通信后端,包括mpi
,gloo
, 和nccl
。 如果在每台机器上使用nccl
后台的多个进程,每个进程必须对它使用的每一个GPU
都有独占的访问权限,因为进程之间共享GPU
会导致死锁。init_method
:可选参数,URL
设置,如何初始化进程组。如果未指定init_method
及store
,则默认为env://
,表示使用读取环境变量的方式进行初始化。该参数与store
互斥。timeout
:可选参数,进程组超时限制。默认为30
分钟。world_size
:可选参数,参与作业的进程数。rank
:可选参数,当前进程节点编号。store
:可选参数,用于存储交换地址信息。group_name
:可选参数,进程组名字。
如果要启动backend == Backend.MPI
的话,Pytorch
需要从源码上build
,并支持MPI
。
TCP初始化
有两种方法可以使用TCP
进行初始化,这两种方法都需要一个所有进程都可以访问的网络地址和所需的world_size
。这种方式需要手动为每个进程指定进程号。
请注意,最新的分布式软件包中不再支持多播地址。group_name也被弃用了。
import torch.distributed as dist # Use address of one of the machines dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
说明
不同进程内,均使用主进程的ip
地址和port
,确保每个进程能够通过一个master
进行协作。该 ip
一般为主进程所在的主机的 ip
,端口号应该未被其他应用占用。
实际使用时,在每个进程内运行代码,并需要为每一个进程手动指定一个rank
,进程可以分布与相同或不同主机上。
多个进程之间,同步进行。若其中一个出现问题,其他的也马上停止。
使用
Node 1
python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 0 --world-size 2
Node 2
python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 1 --world-size 2
底层实现
在深入探讨初始化算法之前,先从C/C++
层面,大致浏览一下 init_process_group
背后发生了什么。
- 解析并验证参数
- 后端通过
name2channel.at()
函数进行解析,返回一个channel
类,将用于执行数据传输 - 丢弃
GIL
,并调用THDProcessGroupInit()
函数,其实例化该channel
,并添加master
节点的地址 rank 0
对应的进程将会执行master
过程,而其他的进程则作为workers
- master
- 为所有的
worker
创建sockets
- 等待所有的
worker
连接 - 发送给他们所有其他进程的位置
- 每一个
worker
- 创建连接
master
的sockets
- 发送自己的位置信息
- 接受其他
workers
的信息 - 打开一个新的
socket
,并与其他wokers
进行握手信号 - 初始化结束,所有的进程之间相互连接
共享文件系统初始化
另一种初始化方法就是利用文件系统,要求共享的文件对于组内所有进程可见。
import torch.distributed as dist # rank should always be specified dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank)
说明:
其中,以file://
为前缀,表示文件系统各式初始化。/mnt/nfs/sharedfile
表示共享的文件,各个进程在共享文件系统中通过该文件进行同步或异步。因此,所有进程必须对该文件具有读写权限。
每一个进程将会打开这个文件,写入自己的信息,并等待直到其他所有进程完成该操作。在此之后,所有的请求信息将会被所有的进程可访问,为了避免race conditions
,文件系统必须支持通过 fcntl
锁定(大多数的 local
系统和NFS
均支持该特性)。
若指定为同一文件,则每次训练开始之前,该文件必须手动删除,但是文件所在路径必须存在!与
tcp
初始化方式一样,也需要为每一个进程手动指定rank
。
使用:
在主机01上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2 • 1
在主机02上:
python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2 • 1
这里相比于TCP
的方式麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。
环境变量初始化
该方法将从环境变量中读取配置,允许完全自定义信息的获取方式。需要设置的变量有:
MASTER_PORT
:必须指定,需为rank 0
机器上的一个空闲端口。MASTER_ADDR
:必须指定,主进程rank 0
机器的地址。WORLD_SIZE
:必须指定,总进程数,可以在这里设置也可以在之前的init
函数中设置。RANK
:当前进程的rank
,也可以在init函数中指定。
rank 0
的机器需要建立所有的连接。
- 使用实例:
Node 1: (IP: 192.168.1.1, and has a free port: 1234)
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE --nnodes=2 --node_rank=0 --master_addr="192.168.1.1" --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)
Node 2
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE --nnodes=2 --node_rank=1 --master_addr="192.168.1.1" --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)
分布式Key-Value Store
distributed
包自带一个分布式key-value
存储,可以用来与不同的进程之间的通信,也可以在torch.distributed.init_process_group()
中进行初始化。有三种Key-Value
的存储方式:TCPStore
, FileStore
, 和HashStore
。
更多的资料参考:https://pytorch.org/docs/stable/distributed.html
这里没有写完,感兴趣的可以自己查看。