简要概览
pytorch
官方提供的分布式数据并行类为:
torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)
与torch.nn.DataParallel
一样,torch.nn.parallel.DistributedDataParallel
都是基于torch.distributed
的。将数据依据batch_size
划分,然后分布到不同的GPU
上运行,之后再汇总。反向传播过程中,每个节点的梯度都被平均了。
实例化这个类之前要求torch.distributed
已经被初始化了,通过调用torch.distributed.init_process_group()
实现。
为了在一台主机上使用N
块GPU
的话,需要生成N
个进程,确保每个进程只在0 00~N − 1 N-1N−1的单个GPU
上运行,这可以通过为每个进程设置CUDA_VISIBLE_DEVICES
或者调用以下代码来实现:
>>> torch.cuda.set_device(i)
其中i
从0 00到N − 1 N-1N−1。在每个进程中参考以下设置来构建模块:
>>> torch.distributed.init_process_group( >>> backend='nccl', world_size=N, init_method='...' >>> ) >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
为了在每个节点上生成多个进程,可以采用torch.distributed.launch
或者torch.multiprocessing.spawn
的方式。
注意:
nccl
后端是目前使用GPU时速度最快、强烈推荐的后端。这适用于单节点和多节点分布式训练。注意:这个模块还支持混合精度分布式训练。这意味着你的模型可以有不同类型的参数,比如
fp16
和fp32
的混合类型,对这些混合类型的参数进行梯度还原就可以了。注意:如果您在一个进程上使用
torch.save
来checkpoint模块,而在其他进程上使用torch.load
来恢复模块,请确保每个进程都正确配置了map_location
。如果没有map_location
,torch.load
会将模块恢复到模块保存的设备上。注意:当一个模型在
batch_size=N
的M
个节点上进行训练时,与在batch_size=M*N
的单个节点上训练的同一个模型相比,如果损失是在一个batch
中的各个实例之间进行加总(而不是像往常一样进行平均),那么梯度将小M
倍(因为不同节点之间的梯度是平均的)。当你想获得一个与本地训练对应的数学上等价的训练过程时,你应该考虑到这一点。但在大多数情况下,你可以只把一个分布式数据并行封装模型、一个数据并行封装模型和一个单GPU上的普通模型视为相同的模型(例如在同等批次大小的情况下使用相同的学习率)。注意:参数是不会在进程间广播的。模块在梯度上执行
all-reduce step
(全还原步骤),并假设它们将在所有进程中被优化器以同样的方式修改。Buffers
(如BatchNorm
统计)在每次迭代时都会从rank 0
的进程中的模块广播到系统中的所有其他副本。
DistributedDataParallel与分布式RPC框架联合使用
如果您将DistributedDataParallel
与分布式 RPC 框架结合使用,您应该始终使用torch.distributed.autograd.backward来计算梯度,并使用torch.distributed.optim.DistributedOptimizer来优化参数。
>>> import torch.distributed.autograd as dist_autograd >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> from torch.distributed.rpc import RRef >>> >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) >>> >>> # Setup optimizer >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) >>> >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) >>> >>> with dist_autograd.context() as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss) >>> dist_optim.step()
警告:构造函数、前向方法、输出(或本模块输出的函数)的分化是分布式的同步点。要考虑到这一点,以防不同进程可能执行不同的代码。
警告:
module
假设在创建模型时,所有参数都已注册。也就是说之后不应添加或删除任何参数。缓冲区也是如此。警告:
module
假设所有分布式进程的module
参数在模型中的注册顺序是相同的。module
将按照模型中注册参数的相反顺序进行梯度还原。换句话说,用户有责任保证每个分布式进程的模型完全相同,从而保证参数注册顺序完全相同。警告:
module
允许参数具有non-rowmajor-contiguous strides
(非行主连续步长)。例如,您的模型可能包含一些参数,其torch.memory_format
是torch.contiguous_format
,而其他参数的格式是torch.channels_last
。但是,不同进程中的相应参数必须具有相同的跨度。警告:这个模块不能和
torch.autograd.grad()
一起使用(也就是说,只有在参数的.grad
属性中要累积grad
的时候,它才会工作)。警告:如果你打算将这个模块与
nccl
后端或gloo
后端(that uses Infiniband),以及使用多个worker
的DataLoader
一起使用,请将多处理启动方法改为forkerver
(仅Python 3)或spwn
。不幸的是,Gloo(that uses Infiniband)和NCCL2不是fork安全的,如果你不改变这个设置,你很可能会遇到死锁。警告:在用
DistributedDataParallel
包装好模型后,你千万不要试图改变模型的参数。因为,当用DistributedDataParallel
封装你的模型时,DistributedDataParalle
l的构造函数会在构造时对模型本身的所有参数注册额外的梯度还原函数。如果之后你改变了模型的参数,梯度重函数不再匹配正确的参数集。警告:
gradient_as_bucket_view
模式还不能与自动混合精度(Automatic Mixed Precision AMP)一起使用。AMP会维护用于取消缩放梯度的隐蔽梯度,当gradient_as_bucket_view=True时,这些隐蔽的梯度将指向通信设备。当gradient_as_bucket_view=True时,在第一次迭代中,这些隐藏的梯度将指向通信桶。在下一次迭代中,通信桶会发生变化,因此这些隐藏的梯度也会意外地发生变化,这可能会导致错误的结果。
参数解析
torch.nn.parallel.DistributedDataParallel
torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)
module
:需要被并行的module。device_ids
:输入参数为一个int类型的list。当单个CUDA设备的时候需要提供这个参数,对于单个设备module,第i个module被复制到device_ids[i]上。对于多个设备的module,或者是CPU的module的时候,device_ids需要为None或者为空list,前向传播输入的数据需要放置到正确的设备上。(对于单设备modules默认为所有可用的devices)。output_device
:单设备模块的输出设备位置,对于多设备modules或者是CPU的modules来说,这个参数需要为None,module本身决定输出位置。(对于单设备来说,默认为device_ids[0])broadcast_buffers
:布尔类型的变量,前向传播时,使得module的buffers同步的控制变量,默认为True。process_group
:用于分布式数据all-reduction的进程组,如果为None,默认的进程组由torch.distributed.init_process_group创建。bucket_cap_mb
:DistributedDataParallel会将bucket 参数放置到多个bucket中,这样梯度在反向计算的过程中就能够覆盖。bucket_cap_mb控制bucket的大小,默认为25。find_unused_parameters
:布尔类型的变量。遍历所有tensor中的 autograd graph,包含了被封装的前向传播的返回值。graph中没有收到梯度的参数会被预先标记为可以被还原。前向传播之后的输出都必须计算loss和梯度,如果没有,这个封装函数会等待autograd产生梯度参数。Any outputs derived from module parameters that are otherwise unused can be detached from the autograd graph using torch.Tensor.detach. (default: False)check_reduction
:这个参数已经被废弃。gradient_as_bucket_view
:布尔类型的变量。This is a prototype feature and subject to changes. When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad() function in torch/optim/optimizer.py as a solution.
主要的使用方法为:
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.DistributedDataParallel(model, pg) • 1 • 2
join函数解析
- join(divide_by_initial_world_size=True, enable=True)
一个与 torch.nn.parallel.DistributedDataParallel 实例结合使用的context manager,以便能够在参与进程间进行不均匀输入的训练。
这个context manager将跟踪已经加入的 DDP 进程,并通过插入collective communication操作与非加入的 DDP 进程创建的操作相匹配来 "shadow "前向和后向传递。这将确保每个集体调用都有一个已经加入的DDP进程的对应调用,防止在各进程输入不均的情况下进行训练时发生挂起或错误。
一旦所有DDP进程加入,context manager将向所有进程广播与最后加入的进程相对应的模型,以确保所有进程的模型是相同的(这是DDP保证的)。
要使用这一点来实现各进程间输入不均匀的训练,只需将此context manager封装在训练循环中即可。无需进一步修改模型或数据加载。
警告:此module只在多进程单设备上使用,意味着一个单个进程在一个GPU上运行。
警告:This module currently does not support custom distributed collective operations in the forward pass, such as SyncBatchNorm or other custom defined collectives in the model’s forward pass.
此函数有两个参数:divide_by_initial_world_size
和enable
。
- divide_by_initial_world_size
如果为True,将用DDP训练launched时,会将初始world_size除以梯度。如果为False,将在all reduce过程中计算有效世界大小(number of ranks that have not depleted their inputs yet),并将梯度除以该大小。设置 divide_by_initial_world_size=True,以确保每个输入样本(包括不均匀的输入)在对全局梯度的贡献度上具有同等权重。这是通过将梯度除以初始world_size来实现的,即使我们遇到不均匀的输入。如果你把这个设置为False,我们会把梯度除以剩余的节点数。这确保了在较小的world_size上进行训练的平等性,尽管这也意味着不均匀的输入会对全局梯度做出更大的贡献。通常情况下,当训练作业的最后几个输入是不均匀的时候,你会希望将此设置为True。在极端的情况下,如果输入的数量有很大的差异,将此设置为False可能会得到更好的结果。
- enable
是否启用不均匀输入检测。默认为True
。
>>> import torch >>> import torch.distributed as dist >>> import os >>> import torch.multiprocessing as mp >>> import torch.nn as nn >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> torch.cuda.set_device(rank) >>> model = nn.Linear(1, 1, bias=False).to(rank) >>> model = torch.nn.parallel.DistributedDataParallel( >>> model, device_ids=[rank], output_device=rank >>> ) >>> # Rank 1 gets one more input than rank 0. >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)] >>> with model.join(): >>> for _ in range(5): >>> for inp in inputs: >>> loss = model(inp).sum() >>> loss.backward() >>> # Without the join() API, the below synchronization will hang >>> # blocking for rank 1's allreduce to complete. >>> torch.cuda.synchronize(device=rank)
no_sync函数解析
一个context manager
,用于禁用跨DDP进程的gradient synchronizations(梯度同步)。在此context中,梯度将在module变量上累积,随后在退出context的第一个前向-后向通道中进行同步。
>>> ddp = torch.nn.DistributedDataParallel(model, pg) >>> with ddp.no_sync(): >>> for input in inputs: >>> ddp(input).backward() # no synchronization, accumulate grads >>> ddp(another_input).backward() # synchronize grads • 1 • 2 • 3 • 4 • 5
看到这我哭了啊,这跟我想要的东西不太一样,哎,没动力看下去了,以后再说这个吧。
源码解析
实例
pytorch官方在github上提供了examples仓库,包含了各种深度学习任务的模型和相关示例代码,这里 我们以Pytorch官方仓库里的ResNet50的分布式训练源码为例,简单讲解下pytorch分布式训练相关方法和参数。
分布式训练的第一步是需要设置分布式进程组,设置多机通信后端、本机ip端口号、节点总数、本机编号等信息。 (源码129行):
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
将上述分布式相关参数,传递到torch.distributed.init_process_group并初始化用于训练的进程组; 初始化进程组之前,我们首先看下main.py的相关参数设置: 相关参数 源码第59行:
parser.add_argument('--world-size', default=-1, type=int, help='number of nodes for distributed training') parser.add_argument('--rank', default=-1, type=int, help='node rank for distributed training') parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str, help='url used to set up distributed training') parser.add_argument('--dist-backend', default='nccl', type=str, help='distributed backend') parser.add_argument('--seed', default=None, type=int, help='seed for initializing training. ') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') parser.add_argument('--multiprocessing-distributed', action='store_true', help='Use multi-processing distributed training to launch ' 'N processes per node, which has N GPUs. This is the ' 'fastest way to use PyTorch for either single node or ' 'multi node data parallel training')
–world-size 表示分布式训练中,机器节点总数
–rank 表示节点编号(n台节点即:0,1,2,…,n-1)
–multiprocessing-distributed 是否开启多进程模式(单机、多机都可开启)
–dist-url 本机的ip,端口号,用于多机通信
–dist-backend 多机通信后端,默认使用nccl
创建模型 分布式进程组初始化完成后,需要将模型通过DDP进行包装。
源码153行:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) # 通过DDP接口创建一个多机model实例。 • 1
数据切分和DataLoader 准备好模型后,需要准备分布式训练所需的数据集,在分布式训练任务中(数据并行)多机的Dataloader和普通dataloader也有所区别,需要用DistributedSampler包装后再通过torch.utils.data.DataLoader实例化成Dataloader。
源码217行:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) 通过DistributedSampler创建一个wapper,将数据集放入其中,再通过 torch.utils.data.DataLoader 创建可用于多机的Dataloader;
if args.distributed: train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) else: train_sampler = None train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=True, sampler=train_sampler)
其余部分,和正常的单机版训练差异不大,此处就不赘述了。
完整的利用ResNet50训练ImageNet的示例可参考:Pytorch官方仓库
分布式训练速度测评及结果,可以参考DLPerf:PyTorch ResNet50 v1.5测评