深度学习实践篇 第九章:DDP

简介: 简要介绍了DDP的使用。

参考教程:
what is DDP
pytorch distributed overview

@[TOC]

DDP介绍

什么是DDP

DDP的全称是DistributedDataParallel,它允许使用pytorch使用数据并行训练。
在非分布式训练中,你的模型被放在一个gpu上,你的模型接收来自一个dataloader的inputbatch,它对输入进行前向传播并且计算损失,然后进行反向传播并计算参数的梯度。优化器在这个过程中对模型的参数进行更新。
当我们现在多个gpu上进行训练时,DDP会在每个GPU上都启动一个进程,每个进程中都有一个模型的复制品replica(local copy),每一个模型和优化器的复制品都是相同的。模型的参数初始化相同,优化器也使用一样的随机种子。DDP在训练过程中维持了这种一致性。
现在,每个gpu的模型都是一样的。我们仍然从dataloader中获取inputbatch,但是这个时候我们使用的是DistributedSampler。这个sampler保证了每个模型接收到的inputbatch是不同的,这就是“data parallel” in DDP。
在每个进程中,模型接收到一个不同的输入,并且locally的进行前向传播和后向传播。因为输入是不同的,所以梯度的累积也是不同的。使用optimizer.step()会产生不同的结果。比如说我们现在有4个gpu,在4个gpu上有4个模型的local copy,那么这四个模型就是不同的模型,而不是我们想要的一个分布式的模型。
DDP中的同步机制,会把来自四个replicas的梯度累加起来,其中使用了一个算法,在replicas间进行通信,它不会等待所有的梯度都计算完成,而是在计算过程中就保持着通信,这保证了你的gpu总是在工作的。
image.png

综合来说,就是在n个gpu上的训练相当于同时进行n个模型的训练,这些模型是完全一样的,实时同步的,只不过传入的数据不一样。

DistributedSampler()

在之前介绍数据集构建的时候曾经提到过sampler的概念。假如dataloader中的shuffle = True,你使用的就是一个randomsampler;假如dataloader中的shuffle = False,你使用的就是一个sequentialsampler。

torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)

DistributedSampler传入的第一个参数是dataset,它要求这个Dataset有一个constant的大小。iterable dataset也可以使用DDP,参考https://github.com/Lightning-AI/lightning/issues/15734。我先不看了,用的时候再说。
第二个参数是num_replicas,你想要复制的个数。第三个参数是rank,是当前的process的index。shuffle和seed主要是控制你的dataset是否要shuffle和随机种子。

DistributedSampler()一般和torch.nn.parallel.DistributedDataParallel组合使用。在这种情况下,每一个进程都会传递一个DistributedSampler实例,并且加载原始数据中彼此完全不同的子集。

在构造函数初始化阶段,会计算每个replica中分配到的samples数。

if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]

最终获得的dataset中数据的总数就是

self.total_size = self.num_samples * self.num_replicas

在取数据的时候,会把不同的数据分配给不同的replicas。

indices = indices[self.rank:self.total_size:self.num_replicas]

在使用DistributedSampler时,在每个epoch都需要调用set_epoch(epoch)函数,这样每个epoch里数据都会按照不同的顺序获取,如果不调用的话,获取的顺序将始终保持一致。因为它随机种子的生成也和当前的epoch有关系。

g.manual_seed(self.seed + self.epoch)

下面给出了一个使用的例子。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None),
                    sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
    train(loader)

DistributedDataParallel()

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False)

DDP为了保证不同gpu上的模型能完成同步,需要进行较为繁琐的设置。
使用这个类要求你先对torch.distributed进行初始化,通过调用方法。

torch.distributed.init_process_group(backend=None, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

在DistributedDataParallel()中,第一个参数module是你想要并行话的module,在训练中也就是你的模型。
参数device_id和output_device代表你的moudle的CUDAdevice和你的output的CUDAdevice。
process_group如果为None,会默认使用你的torch.distributed.init_process_group。

给出一个简单的示例

torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
net = torch.nn.parallel.DistributedDataParallel(model)

为了让你的DDP能够在N个GPU上使用,你需要产生N个进程,并且保证每个进程独立的工作在一个GPU上。有两个方法可以完成这个设置。
第一个方法是:

torch.cuda.set_device(i)

在每个进程中,你需要进行以下的设置。

torch.distributed.init_process_group(
    backend='nccl', world_size=N, init_method='...'
)
model = DistributedDataParallel(model, device_ids=[i], output_device=i)

使用DDP

建议直接参考https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py。给出了一个完整的模版,可以直接套用。

代码示例

教程中给出了一个比较简单但完整的DDP示例。
在这个例子中,init_process_group被封装到一个setup函数里。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

然后定义一个作为例子的小模型。使用torch.multiprocess产生多个进程,rank表示当前的进程数,或者说CUDA id。world_size代表了gpu的个数。在每个进程中,将你的model放到对应的device上。
这个例子中没有加入dataset的部分。只有DDP中模型的使用。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

multiprocessing.spawn()

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

torch.multiprocessing基本上就是在python的multiprocessing上进行了一些改动。具体的不讲了【因为我也没有看】,我们只说一下spawn。

spawn的输入包括 fn:目标函数, args:函数的参数,nprocs:需要的进程数。

在使用这个方法后,multiprocessing会以‘spawn’的方法来创建新的进程。创建的数量由nprocs决定。

for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        process = mp.Process(
            target=_wrap,
            args=(fn, i, args, error_queue),
            daemon=daemon,
        )
        process.start()
        error_queues.append(error_queue)
        processes.append(process)

可以看到在下方代码的mp.Process中使用的target是一个名为_wrap的函数。这个函数把当前process对应的index和args组合在一起,作为输入传入到fn中去。所以理论上来讲,你定义的fn的传入参数数量至少为1个,也就是当前进程的index。而在使用torch中的mp.spawn()时,args里面你不能把这个参数写出来,而只需要写第二个以及以后的参数。

def _wrap(fn, i, args, error_queue):
    # prctl(2) is a Linux specific system call.
    # On other systems the following function call has no effect.
    # This is set to ensure that non-daemonic child processes can
    # terminate if their parent terminates before they do.
    _prctl_pr_set_pdeathsig(signal.SIGINT)

    try:
        fn(i, *args)  # index和args一起作为args传给了fn
    except KeyboardInterrupt:
        pass  # SIGINT; Killed by parent, do nothing
    except Exception:
        # Propagate exception to parent process, keeping original traceback
        import traceback
        error_queue.put(traceback.format_exc())
        sys.exit(1)

save and load checkpoints

模型的保存和加载方法和之前的使用上区别不大。只是加了一点限制。
在下方的示例代码中,在DDP中保存state_dict()时用的是model.module.state_dict()。确认了一下发现在ddp(model)的示例化时,model会被赋值给self.module。
但是有的例子中并没有使用module.state_dict()来完成保存。
这里为了防止重复保存model,限制了只在gpu_id == 0 时才进行模型的保存。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
   self._save_checkpoint(epoch)

一个优化方法是将模型的每次更新都保存在一个进程上然后被加载到所有进程中,这样可以防止多次写入。因为所有的进程都是从一样的参数开始的,并且bp中的梯度计算总是同步的,因此不同gpu上的optimizer也要保持同步。

make sure no process starts loading before the saving is finished。

在下面的例子中,在rank == 0的情况下才会进行model的保存。使用dist.barrier()限制了save期间的别的process对模型的加载。

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {
   
   'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
深度学习的新篇章:从理论到实践的飞跃####
本文深入剖析了深度学习的最新进展,探讨了其背后的理论基础与实际应用之间的桥梁。通过实例展示了深度学习如何革新计算机视觉、自然语言处理等领域,并展望了其未来可能带来的颠覆性变化。文章旨在为读者提供一个清晰的视角,理解深度学习不仅是技术的飞跃,更是推动社会进步的重要力量。 ####
139 61
|
9天前
|
机器学习/深度学习 人工智能 TensorFlow
人工智能浪潮下的自我修养:从Python编程入门到深度学习实践
【10月更文挑战第39天】本文旨在为初学者提供一条清晰的道路,从Python基础语法的掌握到深度学习领域的探索。我们将通过简明扼要的语言和实际代码示例,引导读者逐步构建起对人工智能技术的理解和应用能力。文章不仅涵盖Python编程的基础,还将深入探讨深度学习的核心概念、工具和实战技巧,帮助读者在AI的浪潮中找到自己的位置。
|
1月前
|
机器学习/深度学习 人工智能 TensorFlow
深度学习的探索之旅:从基础到实践
【10月更文挑战第4天】本文将带领读者踏上一段深度学习的探索之旅。我们将从深度学习的基础概念出发,逐步深入到模型构建、训练和优化的实践应用。通过通俗易懂的语言和实际代码示例,本文旨在帮助初学者理解深度学习的核心原理,并鼓励他们动手实践,以加深对这一强大技术的理解和应用。无论你是AI领域的新手还是有一定经验的开发者,这篇文章都将为你提供有价值的见解和指导。
50 5
|
1天前
|
机器学习/深度学习 人工智能 自然语言处理
深度学习中的卷积神经网络(CNN): 从理论到实践
本文将深入浅出地介绍卷积神经网络(CNN)的工作原理,并带领读者通过一个简单的图像分类项目,实现从理论到代码的转变。我们将探索CNN如何识别和处理图像数据,并通过实例展示如何训练一个有效的CNN模型。无论你是深度学习领域的新手还是希望扩展你的技术栈,这篇文章都将为你提供宝贵的知识和技能。
17 7
|
13天前
|
机器学习/深度学习 人工智能 自然语言处理
深度学习中的卷积神经网络:从理论到实践
【10月更文挑战第35天】在人工智能的浪潮中,深度学习技术以其强大的数据处理能力成为科技界的宠儿。其中,卷积神经网络(CNN)作为深度学习的一个重要分支,在图像识别和视频分析等领域展现出了惊人的潜力。本文将深入浅出地介绍CNN的工作原理,并结合实际代码示例,带领读者从零开始构建一个简单的CNN模型,探索其在图像分类任务中的应用。通过本文,读者不仅能够理解CNN背后的数学原理,还能学会如何利用现代深度学习框架实现自己的CNN模型。
|
10天前
|
机器学习/深度学习 数据采集 自然语言处理
深入浅出深度学习:从理论到实践
【10月更文挑战第38天】本文旨在通过浅显易懂的语言和直观的代码示例,带领读者探索深度学习的奥秘。我们将从深度学习的基本概念出发,逐步深入到模型构建、训练以及应用实例,让初学者也能轻松入门。文章不仅介绍了深度学习的原理,还提供了实战操作指南,帮助读者在实践中加深理解。无论你是编程新手还是有一定基础的学习者,都能在这篇文章中找到有价值的内容。让我们一起开启深度学习之旅吧!
|
30天前
|
机器学习/深度学习 调度 计算机视觉
深度学习中的学习率调度:循环学习率、SGDR、1cycle 等方法介绍及实践策略研究
本文探讨了多种学习率调度策略在神经网络训练中的应用,强调了选择合适学习率的重要性。文章介绍了阶梯式衰减、余弦退火、循环学习率等策略,并分析了它们在不同实验设置下的表现。研究表明,循环学习率和SGDR等策略在提高模型性能和加快训练速度方面表现出色,而REX调度则在不同预算条件下表现稳定。这些策略为深度学习实践者提供了实用的指导。
36 2
深度学习中的学习率调度:循环学习率、SGDR、1cycle 等方法介绍及实践策略研究
|
12天前
|
机器学习/深度学习 自然语言处理 语音技术
深度学习的奇妙之旅:从理论到实践
【10月更文挑战第36天】在本文中,我们将一起探索深度学习的神秘世界。我们将首先了解深度学习的基本概念和原理,然后通过一个简单的Python代码示例,学习如何使用深度学习库Keras进行图像分类。无论你是深度学习的初学者,还是有一定基础的学习者,都可以从这篇文章中获得新的知识和启示。
|
17天前
|
机器学习/深度学习 监控 PyTorch
深度学习工程实践:PyTorch Lightning与Ignite框架的技术特性对比分析
在深度学习框架的选择上,PyTorch Lightning和Ignite代表了两种不同的技术路线。本文将从技术实现的角度,深入分析这两个框架在实际应用中的差异,为开发者提供客观的技术参考。
36 7
|
25天前
|
机器学习/深度学习 数据采集 人工智能
深度学习的魔法:从理论到实践的探索####
【10月更文挑战第22天】 本文深入探讨了深度学习这一现代人工智能领域的璀璨明珠,通过生动实例与通俗语言,揭示了其背后的原理、发展历程及在多个行业的应用潜力。文章首先概述了深度学习的基本概念,随后详细解析了神经网络的核心构成,并探讨了当前面临的挑战与未来趋势。最终,通过实际案例展示了深度学习如何改变世界,为读者呈现一幅技术革新引领未来的画卷。 ####
26 3

热门文章

最新文章