PyTorch 2.2 中文官方教程(十七)(3)https://developer.aliyun.com/article/1482597
处理速度不均衡
在 DDP 中,构造函数、前向传递和后向传递是分布式同步点。预期不同的进程将启动相同数量的同步,并按相同顺序到达这些同步点,并在大致相同的时间进入每个同步点。否则,快速进程可能会提前到达并在等待滞后者时超时。因此,用户负责在进程之间平衡工作负载分布。有时,由于网络延迟、资源竞争或不可预测的工作负载波动等原因,不可避免地会出现处理速度不均衡的情况。为了避免在这些情况下超时,请确保在调用init_process_group时传递一个足够大的timeout
值。
保存和加载检查点
在训练过程中,通常使用torch.save
和torch.load
来对模块进行检查点,并从检查点中恢复。有关更多详细信息,请参阅SAVING AND LOADING MODELS。在使用 DDP 时,一种优化是在一个进程中保存模型,然后加载到所有进程中,减少写入开销。这是正确的,因为所有进程都从相同的参数开始,并且在反向传递中梯度是同步的,因此优化器应该保持将参数设置为相同的值。如果使用此优化,请确保在保存完成之前没有进程开始加载。此外,在加载模块时,您需要提供一个适当的map_location
参数,以防止一个进程进入其他设备。如果缺少map_location
,torch.load
将首先将模块加载到 CPU,然后将每个参数复制到保存的位置,这将导致同一台机器上的所有进程使用相同的设备集。有关更高级的故障恢复和弹性支持,请参阅TorchElastic。
def demo_checkpoint(rank, world_size): print(f"Running DDP checkpoint example on rank {rank}.") setup(rank, world_size) model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint" if rank == 0: # All processes should see same parameters as they all start from same # random parameters and gradients are synchronized in backward passes. # Therefore, saving it in one process is sufficient. torch.save(ddp_model.state_dict(), CHECKPOINT_PATH) # Use a barrier() to make sure that process 1 loads the model after process # 0 saves it. dist.barrier() # configure map_location properly map_location = {'cuda:%d' % 0: 'cuda:%d' % rank} ddp_model.load_state_dict( torch.load(CHECKPOINT_PATH, map_location=map_location)) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(rank) loss_fn(outputs, labels).backward() optimizer.step() # Not necessary to use a dist.barrier() to guard the file deletion below # as the AllReduce ops in the backward pass of DDP already served as # a synchronization. if rank == 0: os.remove(CHECKPOINT_PATH) cleanup()
将 DDP 与模型并行结合起来
DDP 也适用于多 GPU 模型。在训练大型模型和大量数据时,DDP 包装多 GPU 模型尤其有帮助。
class ToyMpModel(nn.Module): def __init__(self, dev0, dev1): super(ToyMpModel, self).__init__() self.dev0 = dev0 self.dev1 = dev1 self.net1 = torch.nn.Linear(10, 10).to(dev0) self.relu = torch.nn.ReLU() self.net2 = torch.nn.Linear(10, 5).to(dev1) def forward(self, x): x = x.to(self.dev0) x = self.relu(self.net1(x)) x = x.to(self.dev1) return self.net2(x)
当将多 GPU 模型传递给 DDP 时,device_ids
和output_device
必须不设置。输入和输出数据将由应用程序或模型的forward()
方法放置在适当的设备上。
def demo_model_parallel(rank, world_size): print(f"Running DDP with model parallel example on rank {rank}.") setup(rank, world_size) # setup mp_model and devices for this process dev0 = rank * 2 dev1 = rank * 2 + 1 mp_model = ToyMpModel(dev0, dev1) ddp_mp_model = DDP(mp_model) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001) optimizer.zero_grad() # outputs will be on dev1 outputs = ddp_mp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(dev1) loss_fn(outputs, labels).backward() optimizer.step() cleanup() if __name__ == "__main__": n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" world_size = n_gpus run_demo(demo_basic, world_size) run_demo(demo_checkpoint, world_size) world_size = n_gpus//2 run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP
我们可以利用 PyTorch Elastic 来简化 DDP 代码并更轻松地初始化作业。让我们仍然使用 Toymodel 示例并创建一个名为elastic_ddp.py
的文件。
import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def demo_basic(): dist.init_process_group("nccl") rank = dist.get_rank() print(f"Start running basic DDP example on rank {rank}.") # create model and move it to GPU with id rank device_id = rank % torch.cuda.device_count() model = ToyModel().to(device_id) ddp_model = DDP(model, device_ids=[device_id]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(device_id) loss_fn(outputs, labels).backward() optimizer.step() dist.destroy_process_group() if __name__ == "__main__": demo_basic()
然后可以在所有节点上运行 torch elastic/torchrun 命令来初始化上面创建的 DDP 作业:
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
我们在两台主机上运行 DDP 脚本,每台主机运行 8 个进程,也就是说我们在 16 个 GPU 上运行它。请注意,$MASTER_ADDR
在所有节点上必须相同。
torchrun 将启动 8 个进程,并在启动它的节点上的每个进程上调用elastic_ddp.py
,但用户还需要应用类似 slurm 的集群管理工具来实际在 2 个节点上运行此命令。
例如,在启用了 SLURM 的集群上,我们可以编写一个脚本来运行上面的命令,并将MASTER_ADDR
设置为:
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然后我们可以使用 SLURM 命令运行此脚本:srun --nodes=2 ./torchrun_script.sh
。当然,这只是一个例子;您可以选择自己的集群调度工具来启动 torchrun 作业。
关于 Elastic run 的更多信息,可以查看这个快速入门文档以了解更多。
使用 PyTorch 编写分布式应用程序
原文:
pytorch.org/tutorials/intermediate/dist_tuto.html
译者:飞龙
作者:Séb Arnold
注:
查看并编辑此教程在github。
先决条件:
- PyTorch 分布式概述
在这个简短的教程中,我们将介绍 PyTorch 的分布式包。我们将看到如何设置分布式环境,使用不同的通信策略,并了解一些包的内部情况。
设置
PyTorch 中包含的分布式包(即torch.distributed
)使研究人员和实践者能够轻松地在进程和机器集群之间并行化他们的计算。为此,它利用消息传递语义,允许每个进程将数据传递给任何其他进程。与多进程(torch.multiprocessing
)包相反,进程可以使用不同的通信后端,并不限于在同一台机器上执行。
为了开始,我们需要能够同时运行多个进程的能力。如果您可以访问计算集群,您应该与您的本地系统管理员核实,或者使用您喜欢的协调工具(例如,pdsh,clustershell,或其他工具)。在本教程中,我们将使用一台单机,并使用以下模板生成多个进程。
"""run.py:""" #!/usr/bin/env python import os import torch import torch.distributed as dist import torch.multiprocessing as mp def run(rank, size): """ Distributed function to be implemented later. """ pass def init_process(rank, size, fn, backend='gloo'): """ Initialize the distributed environment. """ os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29500' dist.init_process_group(backend, rank=rank, world_size=size) fn(rank, size) if __name__ == "__main__": size = 2 processes = [] mp.set_start_method("spawn") for rank in range(size): p = mp.Process(target=init_process, args=(rank, size, run)) p.start() processes.append(p) for p in processes: p.join()
上面的脚本生成两个进程,每个进程都将设置分布式环境,初始化进程组(dist.init_process_group
),最后执行给定的run
函数。
让我们来看看init_process
函数。它确保每个进程都能通过一个主进程协调,使用相同的 IP 地址和端口。请注意,我们使用了gloo
后端,但也有其他后端可用。(参见第 5.1 节)我们将在本教程的最后讨论dist.init_process_group
中发生的魔法,但基本上它允许进程通过共享位置来相互通信。
点对点通信
发送和接收
一种进程向另一个进程传输数据的过程称为点对点通信。这些通过send
和recv
函数或它们的立即对应函数isend
和irecv
来实现。
"""Blocking point-to-point communication.""" def run(rank, size): tensor = torch.zeros(1) if rank == 0: tensor += 1 # Send the tensor to process 1 dist.send(tensor=tensor, dst=1) else: # Receive tensor from process 0 dist.recv(tensor=tensor, src=0) print('Rank ', rank, ' has data ', tensor[0])
在上面的例子中,两个进程都从零张量开始,然后进程 0 增加张量并将其发送给进程 1,以便它们最终都变为 1.0。请注意,进程 1 需要分配内存来存储将要接收的数据。
还要注意send
/recv
是阻塞的:两个进程都会停止,直到通信完成。另一方面,immediates 是非阻塞的;脚本会继续执行,方法会返回一个Work
对象,我们可以选择wait()
。
"""Non-blocking point-to-point communication.""" def run(rank, size): tensor = torch.zeros(1) req = None if rank == 0: tensor += 1 # Send the tensor to process 1 req = dist.isend(tensor=tensor, dst=1) print('Rank 0 started sending') else: # Receive tensor from process 0 req = dist.irecv(tensor=tensor, src=0) print('Rank 1 started receiving') req.wait() print('Rank ', rank, ' has data ', tensor[0])
在使用即时通信时,我们必须小心地处理发送和接收的张量。由于我们不知道数据何时会传输到其他进程,因此在req.wait()
完成之前,我们不应修改发送的张量或访问接收的张量。换句话说,
- 在
dist.isend()
之后写入tensor
会导致未定义的行为。 - 在
dist.irecv()
之后从tensor
中读取将导致未定义的行为。
然而,在执行req.wait()
之后,我们可以确保通信已经发生,并且存储在tensor[0]
中的值为 1.0。
点对点通信在我们希望更精细地控制进程通信时非常有用。它们可以用来实现复杂的算法,比如在百度的 DeepSpeech或Facebook 的大规模实验中使用的算法。(参见第 4.1 节)
集体通信
|
分散
|
收集
|
|
减少
|
全局归约
|
|
广播
|
全收集
|
与点对点通信相反,集合允许在组中的所有进程之间进行通信模式。组是所有进程的子集。要创建一个组,我们可以将一组秩传递给dist.new_group(group)
。默认情况下,集合在所有进程上执行,也称为世界。例如,为了获得所有进程上所有张量的总和,我们可以使用dist.all_reduce(tensor, op, group)
集合。
""" All-Reduce example.""" def run(rank, size): """ Simple collective communication. """ group = dist.new_group([0, 1]) tensor = torch.ones(1) dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group) print('Rank ', rank, ' has data ', tensor[0])
由于我们想要对组中所有张量的总和,我们使用dist.ReduceOp.SUM
作为减少运算符。一般来说,任何可交换的数学运算都可以用作运算符。PyTorch 默认提供了 4 种这样的运算符,都在逐元素级别工作:
dist.ReduceOp.SUM
,dist.ReduceOp.PRODUCT
,dist.ReduceOp.MAX
,dist.ReduceOp.MIN
。
除了dist.all_reduce(tensor, op, group)
之外,PyTorch 目前实现了总共 6 种集合操作。
dist.broadcast(tensor, src, group)
: 将tensor
从src
复制到所有其他进程。dist.reduce(tensor, dst, op, group)
: 将op
应用于每个tensor
,并将结果存储在dst
中。dist.all_reduce(tensor, op, group)
: 与 reduce 相同,但结果存储在所有进程中。dist.scatter(tensor, scatter_list, src, group)
: 将第 i ii 个张量scatter_list[i]
复制到第 i ii 个进程。dist.gather(tensor, gather_list, dst, group)
: 将tensor
从所有进程复制到dst
。dist.all_gather(tensor_list, tensor, group)
: 将tensor
从所有进程复制到tensor_list
,在所有进程上。dist.barrier(group)
: 阻塞组中的所有进程,直到每个进程都进入此函数。
分布式训练
注意: 您可以在此 GitHub 存储库中找到本节的示例脚本。
现在我们了解了分布式模块的工作原理,让我们用它来写一些有用的东西。我们的目标是复制DistributedDataParallel的功能。当然,这将是一个教学示例,在实际情况下,您应该使用上面链接的官方、经过充分测试和优化的版本。
我们简单地想要实现随机梯度下降的分布式版本。我们的脚本将让所有进程计算其模型在其数据批次上的梯度,然后平均它们的梯度。为了确保在改变进程数量时获得类似的收敛结果,我们首先需要对数据集进行分区。(您也可以使用tnt.dataset.SplitDataset,而不是下面的代码片段。)
""" Dataset partitioning helper """ class Partition(object): def __init__(self, data, index): self.data = data self.index = index def __len__(self): return len(self.index) def __getitem__(self, index): data_idx = self.index[index] return self.data[data_idx] class DataPartitioner(object): def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234): self.data = data self.partitions = [] rng = Random() rng.seed(seed) data_len = len(data) indexes = [x for x in range(0, data_len)] rng.shuffle(indexes) for frac in sizes: part_len = int(frac * data_len) self.partitions.append(indexes[0:part_len]) indexes = indexes[part_len:] def use(self, partition): return Partition(self.data, self.partitions[partition])
通过上面的片段,我们现在可以简单地使用以下几行代码对任何数据集进行分区:
""" Partitioning MNIST """ def partition_dataset(): dataset = datasets.MNIST('./data', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])) size = dist.get_world_size() bsz = 128 / float(size) partition_sizes = [1.0 / size for _ in range(size)] partition = DataPartitioner(dataset, partition_sizes) partition = partition.use(dist.get_rank()) train_set = torch.utils.data.DataLoader(partition, batch_size=bsz, shuffle=True) return train_set, bsz
假设我们有 2 个副本,那么每个进程将有一个包含 30000 个样本的train_set
。我们还将批量大小除以副本数量,以保持总批量大小为 128。
我们现在可以编写我们通常的前向-后向-优化训练代码,并添加一个函数调用来平均我们模型的梯度。(以下内容在很大程度上受到官方PyTorch MNIST 示例的启发。)
""" Distributed Synchronous SGD Example """ def run(rank, size): torch.manual_seed(1234) train_set, bsz = partition_dataset() model = Net() optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) num_batches = ceil(len(train_set.dataset) / float(bsz)) for epoch in range(10): epoch_loss = 0.0 for data, target in train_set: optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) epoch_loss += loss.item() loss.backward() average_gradients(model) optimizer.step() print('Rank ', dist.get_rank(), ', epoch ', epoch, ': ', epoch_loss / num_batches)
还需要实现average_gradients(model)
函数,它简单地接受一个模型,并在整个世界范围内对其梯度进行平均。
""" Gradient averaging. """ def average_gradients(model): size = float(dist.get_world_size()) for param in model.parameters(): dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM) param.grad.data /= size
看这里!我们成功实现了分布式同步随机梯度下降,并且可以在大型计算机集群上训练任何模型。
**注意:**虽然最后一句话在技术上是正确的,但要实现同步 SGD 的生产级实现需要更多的技巧。再次使用已经经过测试和优化的内容。
我们自己的环形全局归约
作为一个额外的挑战,想象一下我们想要实现 DeepSpeech 的高效环形全局归约。使用点对点集合很容易实现这一目标。
""" Implementation of a ring-reduce with addition. """ def allreduce(send, recv): rank = dist.get_rank() size = dist.get_world_size() send_buff = send.clone() recv_buff = send.clone() accum = send.clone() left = ((rank - 1) + size) % size right = (rank + 1) % size for i in range(size - 1): if i % 2 == 0: # Send send_buff send_req = dist.isend(send_buff, right) dist.recv(recv_buff, left) accum[:] += recv_buff[:] else: # Send recv_buff send_req = dist.isend(recv_buff, right) dist.recv(send_buff, left) accum[:] += send_buff[:] send_req.wait() recv[:] = accum[:]
在上面的脚本中,allreduce(send, recv)
函数的签名与 PyTorch 中的略有不同。它接受一个 recv
张量,并将所有 send
张量的总和存储在其中。作为留给读者的练习,我们的版本与 DeepSpeech 中的版本之间仍然有一个区别:他们的实现将梯度张量分成块,以便最佳地利用通信带宽。(提示:torch.chunk)
高级主题
我们现在准备探索torch.distributed
更高级的功能。由于涉及内容较多,本节分为两个小节:
- 通信后端:在这里我们学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。
- 初始化方法:我们了解如何最好地设置
dist.init_process_group()
中的初始协调阶段。
通信后端
torch.distributed
最优雅的一个方面是它能够抽象并构建在不同的后端之上。如前所述,目前在 PyTorch 中实现了三种后端:Gloo、NCCL 和 MPI。它们各自具有不同的规范和权衡,取决于所需的用例。支持的函数的比较表可以在这里找到。
Gloo 后端
到目前为止,我们已经广泛使用了Gloo 后端。作为一个开发平台,它非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且在 Linux(自 0.2 版本起)和 macOS(自 1.3 版本起)上都可以使用。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。对于 CUDA 张量的集体操作的实现并不像 NCCL 后端提供的那样优化。
正如您肯定已经注意到的那样,如果您将model
放在 GPU 上,我们的分布式 SGD 示例将无法工作。为了使用多个 GPU,让我们也进行以下修改:
- 使用
device = torch.device("cuda:{}".format(rank))
model = Net()
->model = Net().to(device)
- 使用
data, target = data.to(device), target.to(device)
将数据和目标转移到设备上。
通过上述修改,我们的模型现在正在两个 GPU 上训练,您可以使用watch nvidia-smi
来监视它们的利用率。
MPI 后端
消息传递接口(MPI)是来自高性能计算领域的标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed
API 的主要灵感来源。存在几种 MPI 的实现(例如 Open-MPI、MVAPICH2、Intel MPI),每种都针对不同的目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上的广泛可用性和高度优化。一些最近的实现也能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。
不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,我们将不得不手动重新编译它。幸运的是,这个过程相当简单,因为在编译时,PyTorch 会自行寻找可用的 MPI 实现。以下步骤安装 MPI 后端,通过安装 PyTorch from source。
- 创建并激活您的 Anaconda 环境,按照指南安装所有先决条件,但是不要运行
python setup.py install
。 - 选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA-aware MPI 可能需要一些额外的步骤。在我们的情况下,我们将使用不支持 GPU 的 Open-MPI:
conda install -c conda-forge openmpi
- 现在,转到您克隆的 PyTorch 存储库并执行
python setup.py install
。
为了测试我们新安装的后端,需要进行一些修改。
- 将
if __name__ == '__main__':
下面的内容替换为init_process(0, 0, run, backend='mpi')
。 - 运行
mpirun -n 4 python myscript.py
。
这些变化的原因是 MPI 需要在生成进程之前创建自己的环境。MPI 还将生成自己的进程,并执行初始化方法中描述的握手,使init_process_group
的rank
和size
参数变得多余。实际上,这是非常强大的,因为您可以通过向mpirun
传递附加参数来为每个进程定制计算资源。 (例如,每个进程的核心数,手动分配机器给特定的 rank,以及更多)这样做,您应该获得与其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。如果您只使用 CUDA 张量进行集体操作,请考虑使用此后端以获得最佳性能。NCCL 后端已包含在带有 CUDA 支持的预构建二进制文件中。
初始化方法
为了完成本教程,让我们谈谈我们调用的第一个函数:dist.init_process_group(backend, init_method)
。特别是,我们将讨论不同的初始化方法,这些方法负责每个进程之间的初始协调步骤。这些方法允许您定义协调的方式。根据您的硬件设置,其中一种方法应该比其他方法更适合。除了以下部分,您还应该查看官方文档。
环境变量
在整个教程中,我们一直在使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程将能够正确连接到主节点,获取有关其他进程的信息,并最终与它们握手。
MASTER_PORT
:主机上将托管排名为 0 的进程的空闲端口。MASTER_ADDR
: 将托管排名为 0 的进程的机器的 IP 地址。WORLD_SIZE
:进程的总数,这样主进程就知道要等待多少个工作进程。RANK
:每个进程的排名,这样它们就会知道它是主进程还是工作进程。
共享文件系统
共享文件系统要求所有进程都能访问共享文件系统,并通过共享文件进行协调。这意味着每个进程将打开文件,写入其信息,并等待直到所有人都这样做。之后,所有必要的信息将立即对所有进程可用。为了避免竞争条件,文件系统必须支持通过fcntl进行锁定。
dist.init_process_group( init_method='file:///mnt/nfs/sharedfile', rank=args.rank, world_size=4)
传输控制协议(TCP)是一种面向连接的协议,它提供可靠的数据传输服务。TCP 在网络通信中起着重要作用,它确保数据在发送和接收之间的可靠传输。TCP 使用三次握手建立连接,并使用流量控制和拥塞控制来确保数据传输的稳定性。TCP 是互联网上最常用的协议之一,被广泛应用于各种网络应用中。
通过提供进程 0 的 IP 地址和可达端口号,可以通过 TCP 进行初始化。在这里,所有的工作进程都可以连接到进程 0,并交换彼此如何联系的信息。
dist.init_process_group( init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
致谢
我想感谢 PyTorch 开发人员在他们的实现、文档和测试方面做得如此出色。当代码不清晰时,我总是可以依靠文档或测试找到答案。特别感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 在初稿中提供深刻的评论并回答问题。