深度学习实践篇 第九章:DDP

简介: 简要介绍了DDP的使用。

参考教程:
what is DDP
pytorch distributed overview

@[TOC]

DDP介绍

什么是DDP

DDP的全称是DistributedDataParallel,它允许使用pytorch使用数据并行训练。
在非分布式训练中,你的模型被放在一个gpu上,你的模型接收来自一个dataloader的inputbatch,它对输入进行前向传播并且计算损失,然后进行反向传播并计算参数的梯度。优化器在这个过程中对模型的参数进行更新。
当我们现在多个gpu上进行训练时,DDP会在每个GPU上都启动一个进程,每个进程中都有一个模型的复制品replica(local copy),每一个模型和优化器的复制品都是相同的。模型的参数初始化相同,优化器也使用一样的随机种子。DDP在训练过程中维持了这种一致性。
现在,每个gpu的模型都是一样的。我们仍然从dataloader中获取inputbatch,但是这个时候我们使用的是DistributedSampler。这个sampler保证了每个模型接收到的inputbatch是不同的,这就是“data parallel” in DDP。
在每个进程中,模型接收到一个不同的输入,并且locally的进行前向传播和后向传播。因为输入是不同的,所以梯度的累积也是不同的。使用optimizer.step()会产生不同的结果。比如说我们现在有4个gpu,在4个gpu上有4个模型的local copy,那么这四个模型就是不同的模型,而不是我们想要的一个分布式的模型。
DDP中的同步机制,会把来自四个replicas的梯度累加起来,其中使用了一个算法,在replicas间进行通信,它不会等待所有的梯度都计算完成,而是在计算过程中就保持着通信,这保证了你的gpu总是在工作的。
image.png

综合来说,就是在n个gpu上的训练相当于同时进行n个模型的训练,这些模型是完全一样的,实时同步的,只不过传入的数据不一样。

DistributedSampler()

在之前介绍数据集构建的时候曾经提到过sampler的概念。假如dataloader中的shuffle = True,你使用的就是一个randomsampler;假如dataloader中的shuffle = False,你使用的就是一个sequentialsampler。

torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)

DistributedSampler传入的第一个参数是dataset,它要求这个Dataset有一个constant的大小。iterable dataset也可以使用DDP,参考https://github.com/Lightning-AI/lightning/issues/15734。我先不看了,用的时候再说。
第二个参数是num_replicas,你想要复制的个数。第三个参数是rank,是当前的process的index。shuffle和seed主要是控制你的dataset是否要shuffle和随机种子。

DistributedSampler()一般和torch.nn.parallel.DistributedDataParallel组合使用。在这种情况下,每一个进程都会传递一个DistributedSampler实例,并且加载原始数据中彼此完全不同的子集。

在构造函数初始化阶段,会计算每个replica中分配到的samples数。

if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]

最终获得的dataset中数据的总数就是

self.total_size = self.num_samples * self.num_replicas

在取数据的时候,会把不同的数据分配给不同的replicas。

indices = indices[self.rank:self.total_size:self.num_replicas]

在使用DistributedSampler时,在每个epoch都需要调用set_epoch(epoch)函数,这样每个epoch里数据都会按照不同的顺序获取,如果不调用的话,获取的顺序将始终保持一致。因为它随机种子的生成也和当前的epoch有关系。

g.manual_seed(self.seed + self.epoch)

下面给出了一个使用的例子。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None),
                    sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
    train(loader)

DistributedDataParallel()

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False)

DDP为了保证不同gpu上的模型能完成同步,需要进行较为繁琐的设置。
使用这个类要求你先对torch.distributed进行初始化,通过调用方法。

torch.distributed.init_process_group(backend=None, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

在DistributedDataParallel()中,第一个参数module是你想要并行话的module,在训练中也就是你的模型。
参数device_id和output_device代表你的moudle的CUDAdevice和你的output的CUDAdevice。
process_group如果为None,会默认使用你的torch.distributed.init_process_group。

给出一个简单的示例

torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
net = torch.nn.parallel.DistributedDataParallel(model)

为了让你的DDP能够在N个GPU上使用,你需要产生N个进程,并且保证每个进程独立的工作在一个GPU上。有两个方法可以完成这个设置。
第一个方法是:

torch.cuda.set_device(i)

在每个进程中,你需要进行以下的设置。

torch.distributed.init_process_group(
    backend='nccl', world_size=N, init_method='...'
)
model = DistributedDataParallel(model, device_ids=[i], output_device=i)

使用DDP

建议直接参考https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py。给出了一个完整的模版,可以直接套用。

代码示例

教程中给出了一个比较简单但完整的DDP示例。
在这个例子中,init_process_group被封装到一个setup函数里。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

然后定义一个作为例子的小模型。使用torch.multiprocess产生多个进程,rank表示当前的进程数,或者说CUDA id。world_size代表了gpu的个数。在每个进程中,将你的model放到对应的device上。
这个例子中没有加入dataset的部分。只有DDP中模型的使用。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

multiprocessing.spawn()

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

torch.multiprocessing基本上就是在python的multiprocessing上进行了一些改动。具体的不讲了【因为我也没有看】,我们只说一下spawn。

spawn的输入包括 fn:目标函数, args:函数的参数,nprocs:需要的进程数。

在使用这个方法后,multiprocessing会以‘spawn’的方法来创建新的进程。创建的数量由nprocs决定。

for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        process = mp.Process(
            target=_wrap,
            args=(fn, i, args, error_queue),
            daemon=daemon,
        )
        process.start()
        error_queues.append(error_queue)
        processes.append(process)

可以看到在下方代码的mp.Process中使用的target是一个名为_wrap的函数。这个函数把当前process对应的index和args组合在一起,作为输入传入到fn中去。所以理论上来讲,你定义的fn的传入参数数量至少为1个,也就是当前进程的index。而在使用torch中的mp.spawn()时,args里面你不能把这个参数写出来,而只需要写第二个以及以后的参数。

def _wrap(fn, i, args, error_queue):
    # prctl(2) is a Linux specific system call.
    # On other systems the following function call has no effect.
    # This is set to ensure that non-daemonic child processes can
    # terminate if their parent terminates before they do.
    _prctl_pr_set_pdeathsig(signal.SIGINT)

    try:
        fn(i, *args)  # index和args一起作为args传给了fn
    except KeyboardInterrupt:
        pass  # SIGINT; Killed by parent, do nothing
    except Exception:
        # Propagate exception to parent process, keeping original traceback
        import traceback
        error_queue.put(traceback.format_exc())
        sys.exit(1)

save and load checkpoints

模型的保存和加载方法和之前的使用上区别不大。只是加了一点限制。
在下方的示例代码中,在DDP中保存state_dict()时用的是model.module.state_dict()。确认了一下发现在ddp(model)的示例化时,model会被赋值给self.module。
但是有的例子中并没有使用module.state_dict()来完成保存。
这里为了防止重复保存model,限制了只在gpu_id == 0 时才进行模型的保存。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
   self._save_checkpoint(epoch)

一个优化方法是将模型的每次更新都保存在一个进程上然后被加载到所有进程中,这样可以防止多次写入。因为所有的进程都是从一样的参数开始的,并且bp中的梯度计算总是同步的,因此不同gpu上的optimizer也要保持同步。

make sure no process starts loading before the saving is finished。

在下面的例子中,在rank == 0的情况下才会进行model的保存。使用dist.barrier()限制了save期间的别的process对模型的加载。

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {
   
   'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
相关实践学习
基于阿里云DeepGPU实例,用AI画唯美国风少女
本实验基于阿里云DeepGPU实例,使用aiacctorch加速stable-diffusion-webui,用AI画唯美国风少女,可提升性能至高至原性能的2.6倍。
相关文章
|
3天前
|
机器学习/深度学习 算法
揭秘深度学习中的对抗性网络:理论与实践
【5月更文挑战第18天】 在深度学习领域的众多突破中,对抗性网络(GANs)以其独特的机制和强大的生成能力受到广泛关注。不同于传统的监督学习方法,GANs通过同时训练生成器与判别器两个模型,实现了无监督学习下的高效数据生成。本文将深入探讨对抗性网络的核心原理,解析其数学模型,并通过案例分析展示GANs在图像合成、风格迁移及增强学习等领域的应用。此外,我们还将讨论当前GANs面临的挑战以及未来的发展方向,为读者提供一个全面而深入的视角以理解这一颠覆性技术。
|
4天前
|
机器学习/深度学习 人工智能 算法
【AI】从零构建深度学习框架实践
【5月更文挑战第16天】 本文介绍了从零构建一个轻量级的深度学习框架tinynn,旨在帮助读者理解深度学习的基本组件和框架设计。构建过程包括设计框架架构、实现基本功能、模型定义、反向传播算法、训练和推理过程以及性能优化。文章详细阐述了网络层、张量、损失函数、优化器等组件的抽象和实现,并给出了一个基于MNIST数据集的分类示例,与TensorFlow进行了简单对比。tinynn的源代码可在GitHub上找到,目前支持多种层、损失函数和优化器,适用于学习和实验新算法。
60 2
|
6天前
|
机器学习/深度学习 人工智能 自然语言处理
深度理解深度学习:从理论到实践的探索
【5月更文挑战第3天】 在人工智能的浪潮中,深度学习以其卓越的性能和广泛的应用成为了研究的热点。本文将深入探讨深度学习的核心理论,解析其背后的数学原理,并通过实际案例分析如何将这些理论应用于解决现实世界的问题。我们将从神经网络的基础结构出发,逐步过渡到复杂的模型架构,同时讨论优化算法和正则化技巧。通过本文,读者将对深度学习有一个全面而深刻的认识,并能够在实践中更加得心应手地应用这些技术。
|
6天前
|
机器学习/深度学习 人工智能 缓存
安卓应用性能优化实践探索深度学习在图像识别中的应用进展
【4月更文挑战第30天】随着智能手机的普及,移动应用已成为用户日常生活的重要组成部分。对于安卓开发者而言,确保应用流畅、高效地运行在多样化的硬件上是一大挑战。本文将探讨针对安卓平台进行应用性能优化的策略和技巧,包括内存管理、多线程处理、UI渲染效率提升以及电池使用优化,旨在帮助开发者构建更加健壮、响应迅速的安卓应用。 【4月更文挑战第30天】 随着人工智能技术的迅猛发展,深度学习已成为推动计算机视觉领域革新的核心动力。本篇文章将深入分析深度学习技术在图像识别任务中的最新应用进展,并探讨其面临的挑战与未来发展趋势。通过梳理卷积神经网络(CNN)的优化策略、转移学习的实践应用以及增强学习与生成对
|
6天前
|
机器学习/深度学习 人工智能 自然语言处理
从零开始学习深度学习:入门指南与实践建议
本文将引导读者进入深度学习领域的大门,从基础概念到实际应用,为初学者提供全面的学习指南和实践建议。通过系统化的学习路径规划和案例实践,帮助读者快速掌握深度学习的核心知识和技能,迈出在人工智能领域的第一步。
|
6天前
|
机器学习/深度学习 Python
有没有一些开源的深度学习项目可以帮助我实践所学的知识?
【2月更文挑战第14天】【2月更文挑战第40篇】有没有一些开源的深度学习项目可以帮助我实践所学的知识?
|
6天前
|
机器学习/深度学习 人工智能 算法
【深度学习】因果推断与机器学习的高级实践 | 数学建模
【深度学习】因果推断与机器学习的高级实践 | 数学建模
|
6天前
|
机器学习/深度学习 人工智能 算法
基于AidLux的工业视觉少样本缺陷检测实战应用---深度学习分割模型UNET的实践部署
  工业视觉在生产和制造中扮演着关键角色,而缺陷检测则是确保产品质量和生产效率的重要环节。工业视觉的前景与发展在于其在生产制造领域的关键作用,尤其是在少样本缺陷检测方面,借助AidLux技术和深度学习分割模型UNET的实践应用,深度学习分割模型UNET的实践部署变得至关重要。
72 1
|
6天前
|
机器学习/深度学习 搜索推荐 算法
推荐系统算法的研究与实践:协同过滤、基于内容的推荐和深度学习推荐模型
推荐系统算法的研究与实践:协同过滤、基于内容的推荐和深度学习推荐模型
278 1
|
8月前
|
机器学习/深度学习 自然语言处理
深度学习LSTM网络:自然语言处理实践
深度学习LSTM网络:自然语言处理实践
50 0