PyTorch并行与分布式(四)Distributed Data Papallel

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: PyTorch并行与分布式(四)Distributed Data Papallel

简要概览

  pytorch官方提供的分布式数据并行类为:

torch.nn.parallel.DistributedDataParallel(module, 
      device_ids=None, 
      output_device=None, 
      dim=0, 
      broadcast_buffers=True, 
      process_group=None, 
      bucket_cap_mb=25, 
      find_unused_parameters=False, 
      check_reduction=False, 
      gradient_as_bucket_view=False)

  与torch.nn.DataParallel一样,torch.nn.parallel.DistributedDataParallel都是基于torch.distributed的。将数据依据batch_size划分,然后分布到不同的GPU上运行,之后再汇总。反向传播过程中,每个节点的梯度都被平均了。

  实例化这个类之前要求torch.distributed已经被初始化了,通过调用torch.distributed.init_process_group()实现。

  为了在一台主机上使用NGPU的话,需要生成N个进程,确保每个进程只在0 00~N − 1 N-1N1的单个GPU上运行,这可以通过为每个进程设置CUDA_VISIBLE_DEVICES或者调用以下代码来实现:

>>> torch.cuda.set_device(i)

  其中i0 00N − 1 N-1N1。在每个进程中参考以下设置来构建模块:

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

  为了在每个节点上生成多个进程,可以采用torch.distributed.launch或者torch.multiprocessing.spawn的方式。

注意:nccl后端是目前使用GPU时速度最快、强烈推荐的后端。这适用于单节点和多节点分布式训练。

注意:这个模块还支持混合精度分布式训练。这意味着你的模型可以有不同类型的参数,比如fp16fp32的混合类型,对这些混合类型的参数进行梯度还原就可以了。

注意:如果您在一个进程上使用torch.save来checkpoint模块,而在其他进程上使用 torch.load来恢复模块,请确保每个进程都正确配置了map_location。如果没有map_locationtorch.load会将模块恢复到模块保存的设备上。

注意:当一个模型在batch_size=NM个节点上进行训练时,与在batch_size=M*N的单个节点上训练的同一个模型相比,如果损失是在一个batch中的各个实例之间进行加总(而不是像往常一样进行平均),那么梯度将小M倍(因为不同节点之间的梯度是平均的)。当你想获得一个与本地训练对应的数学上等价的训练过程时,你应该考虑到这一点。但在大多数情况下,你可以只把一个分布式数据并行封装模型、一个数据并行封装模型和一个单GPU上的普通模型视为相同的模型(例如在同等批次大小的情况下使用相同的学习率)。

注意:参数是不会在进程间广播的。模块在梯度上执行all-reduce step(全还原步骤),并假设它们将在所有进程中被优化器以同样的方式修改。Buffers(如BatchNorm统计)在每次迭代时都会从rank 0的进程中的模块广播到系统中的所有其他副本。

DistributedDataParallel与分布式RPC框架联合使用

  如果您将DistributedDataParallel分布式 RPC 框架结合使用,您应该始终使用torch.distributed.autograd.backward来计算梯度,并使用torch.distributed.optim.DistributedOptimizer来优化参数。

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
>>>     dist_optim.step()

警告:构造函数、前向方法、输出(或本模块输出的函数)的分化是分布式的同步点。要考虑到这一点,以防不同进程可能执行不同的代码。

警告:module假设在创建模型时,所有参数都已注册。也就是说之后不应添加或删除任何参数。缓冲区也是如此。

警告:module假设所有分布式进程的module参数在模型中的注册顺序是相同的。module将按照模型中注册参数的相反顺序进行梯度还原。换句话说,用户有责任保证每个分布式进程的模型完全相同,从而保证参数注册顺序完全相同。

警告:module允许参数具有non-rowmajor-contiguous strides(非行主连续步长)。例如,您的模型可能包含一些参数,其torch.memory_formattorch.contiguous_format,而其他参数的格式是torch.channels_last。但是,不同进程中的相应参数必须具有相同的跨度。

警告:这个模块不能和torch.autograd.grad()一起使用(也就是说,只有在参数的.grad属性中要累积grad的时候,它才会工作)。

警告:如果你打算将这个模块与nccl后端或gloo后端(that uses Infiniband),以及使用多个workerDataLoader一起使用,请将多处理启动方法改为forkerver(仅Python 3)或spwn。不幸的是,Gloo(that uses Infiniband)和NCCL2不是fork安全的,如果你不改变这个设置,你很可能会遇到死锁。

警告:在用DistributedDataParallel包装好模型后,你千万不要试图改变模型的参数。因为,当用DistributedDataParallel封装你的模型时, DistributedDataParallel的构造函数会在构造时对模型本身的所有参数注册额外的梯度还原函数。如果之后你改变了模型的参数,梯度重函数不再匹配正确的参数集。

警告:gradient_as_bucket_view模式还不能与自动混合精度(Automatic Mixed Precision AMP)一起使用。AMP会维护用于取消缩放梯度的隐蔽梯度,当gradient_as_bucket_view=True时,这些隐蔽的梯度将指向通信设备。当gradient_as_bucket_view=True时,在第一次迭代中,这些隐藏的梯度将指向通信桶。在下一次迭代中,通信桶会发生变化,因此这些隐藏的梯度也会意外地发生变化,这可能会导致错误的结果。

参数解析

torch.nn.parallel.DistributedDataParallel

torch.nn.parallel.DistributedDataParallel(module, 
      device_ids=None, 
      output_device=None, 
      dim=0, 
      broadcast_buffers=True, 
      process_group=None, 
      bucket_cap_mb=25, 
      find_unused_parameters=False, 
      check_reduction=False, 
      gradient_as_bucket_view=False)
  1. module:需要被并行的module。
  2. device_ids:输入参数为一个int类型的list。当单个CUDA设备的时候需要提供这个参数,对于单个设备module,第i个module被复制到device_ids[i]上。对于多个设备的module,或者是CPU的module的时候,device_ids需要为None或者为空list,前向传播输入的数据需要放置到正确的设备上。(对于单设备modules默认为所有可用的devices)。
  3. output_device:单设备模块的输出设备位置,对于多设备modules或者是CPU的modules来说,这个参数需要为None,module本身决定输出位置。(对于单设备来说,默认为device_ids[0])
  4. broadcast_buffers:布尔类型的变量,前向传播时,使得module的buffers同步的控制变量,默认为True。
  5. process_group:用于分布式数据all-reduction的进程组,如果为None,默认的进程组由torch.distributed.init_process_group创建。
  6. bucket_cap_mb:DistributedDataParallel会将bucket 参数放置到多个bucket中,这样梯度在反向计算的过程中就能够覆盖。bucket_cap_mb控制bucket的大小,默认为25。
  7. find_unused_parameters:布尔类型的变量。遍历所有tensor中的 autograd graph,包含了被封装的前向传播的返回值。graph中没有收到梯度的参数会被预先标记为可以被还原。前向传播之后的输出都必须计算loss和梯度,如果没有,这个封装函数会等待autograd产生梯度参数。Any outputs derived from module parameters that are otherwise unused can be detached from the autograd graph using torch.Tensor.detach. (default: False)
  8. check_reduction:这个参数已经被废弃。
  9. gradient_as_bucket_view:布尔类型的变量。This is a prototype feature and subject to changes. When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad() function in torch/optim/optimizer.py as a solution.

  主要的使用方法为:

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.DistributedDataParallel(model, pg)
• 1
• 2

join函数解析

  • join(divide_by_initial_world_size=True, enable=True)

  一个与 torch.nn.parallel.DistributedDataParallel 实例结合使用的context manager,以便能够在参与进程间进行不均匀输入的训练。

  这个context manager将跟踪已经加入的 DDP 进程,并通过插入collective communication操作与非加入的 DDP 进程创建的操作相匹配来 "shadow "前向和后向传递。这将确保每个集体调用都有一个已经加入的DDP进程的对应调用,防止在各进程输入不均的情况下进行训练时发生挂起或错误。

  一旦所有DDP进程加入,context manager将向所有进程广播与最后加入的进程相对应的模型,以确保所有进程的模型是相同的(这是DDP保证的)。

  要使用这一点来实现各进程间输入不均匀的训练,只需将此context manager封装在训练循环中即可。无需进一步修改模型或数据加载。

警告:此module只在多进程单设备上使用,意味着一个单个进程在一个GPU上运行。

警告:This module currently does not support custom distributed collective operations in the forward pass, such as SyncBatchNorm or other custom defined collectives in the model’s forward pass.

  此函数有两个参数:divide_by_initial_world_sizeenable

  1. divide_by_initial_world_size

  如果为True,将用DDP训练launched时,会将初始world_size除以梯度。如果为False,将在all reduce过程中计算有效世界大小(number of ranks that have not depleted their inputs yet),并将梯度除以该大小。设置 divide_by_initial_world_size=True,以确保每个输入样本(包括不均匀的输入)在对全局梯度的贡献度上具有同等权重。这是通过将梯度除以初始world_size来实现的,即使我们遇到不均匀的输入。如果你把这个设置为False,我们会把梯度除以剩余的节点数。这确保了在较小的world_size上进行训练的平等性,尽管这也意味着不均匀的输入会对全局梯度做出更大的贡献。通常情况下,当训练作业的最后几个输入是不均匀的时候,你会希望将此设置为True。在极端的情况下,如果输入的数量有很大的差异,将此设置为False可能会得到更好的结果。

  1. enable

  是否启用不均匀输入检测。默认为True

>>>  import torch
>>>  import torch.distributed as dist
>>>  import os
>>>  import torch.multiprocessing as mp
>>>  import torch.nn as nn
>>>  # On each spawned worker
>>>  def worker(rank):
>>>      dist.init_process_group("nccl", rank=rank, world_size=2)
>>>      torch.cuda.set_device(rank)
>>>      model = nn.Linear(1, 1, bias=False).to(rank)
>>>      model = torch.nn.parallel.DistributedDataParallel(
>>>          model, device_ids=[rank], output_device=rank
>>>      )
>>>      # Rank 1 gets one more input than rank 0.
>>>      inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>      with model.join():
>>>          for _ in range(5):
>>>              for inp in inputs:
>>>                  loss = model(inp).sum()
>>>                  loss.backward()
>>>  # Without the join() API, the below synchronization will hang
>>>  # blocking for rank 1's allreduce to complete.
>>>  torch.cuda.synchronize(device=rank)

no_sync函数解析

  一个context manager,用于禁用跨DDP进程的gradient synchronizations(梯度同步)。在此context中,梯度将在module变量上累积,随后在退出context的第一个前向-后向通道中进行同步。

>>> ddp = torch.nn.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>   for input in inputs:
>>>     ddp(input).backward()  # no synchronization, accumulate grads
>>> ddp(another_input).backward()  # synchronize grads
• 1
• 2
• 3
• 4
• 5

看到这我哭了啊,这跟我想要的东西不太一样,哎,没动力看下去了,以后再说这个吧。

源码解析

实例

  pytorch官方在github上提供了examples仓库,包含了各种深度学习任务的模型和相关示例代码,这里 我们以Pytorch官方仓库里的ResNet50的分布式训练源码为例,简单讲解下pytorch分布式训练相关方法和参数。

  分布式训练的第一步是需要设置分布式进程组,设置多机通信后端、本机ip端口号、节点总数、本机编号等信息。 (源码129行):

dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                world_size=args.world_size, rank=args.rank)

将上述分布式相关参数,传递到torch.distributed.init_process_group并初始化用于训练的进程组; 初始化进程组之前,我们首先看下main.py的相关参数设置: 相关参数 源码第59行:

parser.add_argument('--world-size', default=-1, type=int,
                    help='number of nodes for distributed training')
parser.add_argument('--rank', default=-1, type=int,
                    help='node rank for distributed training')
parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str,
                    help='url used to set up distributed training')
parser.add_argument('--dist-backend', default='nccl', type=str,
                    help='distributed backend')
parser.add_argument('--seed', default=None, type=int,
                    help='seed for initializing training. ')
parser.add_argument('--gpu', default=None, type=int,
                    help='GPU id to use.')
parser.add_argument('--multiprocessing-distributed', action='store_true',
                    help='Use multi-processing distributed training to launch '
                         'N processes per node, which has N GPUs. This is the '
                         'fastest way to use PyTorch for either single node or '
                         'multi node data parallel training')

–world-size 表示分布式训练中,机器节点总数

–rank 表示节点编号(n台节点即:0,1,2,…,n-1)

–multiprocessing-distributed 是否开启多进程模式(单机、多机都可开启)

–dist-url 本机的ip,端口号,用于多机通信

–dist-backend 多机通信后端,默认使用nccl

创建模型 分布式进程组初始化完成后,需要将模型通过DDP进行包装。

源码153行:

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) # 通过DDP接口创建一个多机model实例。
• 1

数据切分和DataLoader 准备好模型后,需要准备分布式训练所需的数据集,在分布式训练任务中(数据并行)多机的Dataloader和普通dataloader也有所区别,需要用DistributedSampler包装后再通过torch.utils.data.DataLoader实例化成Dataloader。

源码217行:

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) 通过DistributedSampler创建一个wapper,将数据集放入其中,再通过 torch.utils.data.DataLoader 创建可用于多机的Dataloader;

if args.distributed:
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    else:
        train_sampler = None
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
        num_workers=args.workers, pin_memory=True, sampler=train_sampler)

其余部分,和正常的单机版训练差异不大,此处就不赘述了。

完整的利用ResNet50训练ImageNet的示例可参考:Pytorch官方仓库

分布式训练速度测评及结果,可以参考DLPerf:PyTorch ResNet50 v1.5测评

参考

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
18天前
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
46 6
|
3月前
|
机器学习/深度学习 自然语言处理 数据可视化
分布式表示(Distributed Representation)
分布式表示(Distributed Representation)
152 15
|
4月前
|
分布式计算 并行计算 大数据
NumPy 并行计算与分布式部署
【8月更文第30天】随着数据量的不断增长,传统的单机计算模型已经难以满足对大规模数据集处理的需求。并行和分布式计算成为了处理这些大数据集的关键技术。虽然 NumPy 本身并不直接支持并行计算,但可以通过结合其他库如 Numba 和 Dask 来实现高效的并行和分布式计算。
40 1
|
4月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
396 2
|
4月前
|
机器学习/深度学习 自然语言处理 数据可视化
分布式表示(Distributed Representation)
分布式表示(Distributed Representation)
|
4月前
|
机器学习/深度学习 分布式计算 PyTorch
构建可扩展的深度学习系统:PyTorch 与分布式计算
【8月更文第29天】随着数据量和模型复杂度的增加,单个GPU或CPU已无法满足大规模深度学习模型的训练需求。分布式计算提供了一种解决方案,能够有效地利用多台机器上的多个GPU进行并行训练,显著加快训练速度。本文将探讨如何使用PyTorch框架实现深度学习模型的分布式训练,并通过一个具体的示例展示整个过程。
167 0
|
4月前
|
存储 异构计算
自研分布式训练框架EPL问题之通过strategy annotation实现流水并行如何解决
自研分布式训练框架EPL问题之通过strategy annotation实现流水并行如何解决
|
5月前
|
分布式计算 API 对象存储
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
871 11
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
4月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
116 2
基于Redis的高可用分布式锁——RedLock