深度学习实践篇 第九章:DDP

简介: 简要介绍了DDP的使用。

参考教程:
what is DDP
pytorch distributed overview

@[TOC]

DDP介绍

什么是DDP

DDP的全称是DistributedDataParallel,它允许使用pytorch使用数据并行训练。
在非分布式训练中,你的模型被放在一个gpu上,你的模型接收来自一个dataloader的inputbatch,它对输入进行前向传播并且计算损失,然后进行反向传播并计算参数的梯度。优化器在这个过程中对模型的参数进行更新。
当我们现在多个gpu上进行训练时,DDP会在每个GPU上都启动一个进程,每个进程中都有一个模型的复制品replica(local copy),每一个模型和优化器的复制品都是相同的。模型的参数初始化相同,优化器也使用一样的随机种子。DDP在训练过程中维持了这种一致性。
现在,每个gpu的模型都是一样的。我们仍然从dataloader中获取inputbatch,但是这个时候我们使用的是DistributedSampler。这个sampler保证了每个模型接收到的inputbatch是不同的,这就是“data parallel” in DDP。
在每个进程中,模型接收到一个不同的输入,并且locally的进行前向传播和后向传播。因为输入是不同的,所以梯度的累积也是不同的。使用optimizer.step()会产生不同的结果。比如说我们现在有4个gpu,在4个gpu上有4个模型的local copy,那么这四个模型就是不同的模型,而不是我们想要的一个分布式的模型。
DDP中的同步机制,会把来自四个replicas的梯度累加起来,其中使用了一个算法,在replicas间进行通信,它不会等待所有的梯度都计算完成,而是在计算过程中就保持着通信,这保证了你的gpu总是在工作的。
image.png

综合来说,就是在n个gpu上的训练相当于同时进行n个模型的训练,这些模型是完全一样的,实时同步的,只不过传入的数据不一样。

DistributedSampler()

在之前介绍数据集构建的时候曾经提到过sampler的概念。假如dataloader中的shuffle = True,你使用的就是一个randomsampler;假如dataloader中的shuffle = False,你使用的就是一个sequentialsampler。

torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)

DistributedSampler传入的第一个参数是dataset,它要求这个Dataset有一个constant的大小。iterable dataset也可以使用DDP,参考https://github.com/Lightning-AI/lightning/issues/15734。我先不看了,用的时候再说。
第二个参数是num_replicas,你想要复制的个数。第三个参数是rank,是当前的process的index。shuffle和seed主要是控制你的dataset是否要shuffle和随机种子。

DistributedSampler()一般和torch.nn.parallel.DistributedDataParallel组合使用。在这种情况下,每一个进程都会传递一个DistributedSampler实例,并且加载原始数据中彼此完全不同的子集。

在构造函数初始化阶段,会计算每个replica中分配到的samples数。

if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]

最终获得的dataset中数据的总数就是

self.total_size = self.num_samples * self.num_replicas

在取数据的时候,会把不同的数据分配给不同的replicas。

indices = indices[self.rank:self.total_size:self.num_replicas]

在使用DistributedSampler时,在每个epoch都需要调用set_epoch(epoch)函数,这样每个epoch里数据都会按照不同的顺序获取,如果不调用的话,获取的顺序将始终保持一致。因为它随机种子的生成也和当前的epoch有关系。

g.manual_seed(self.seed + self.epoch)

下面给出了一个使用的例子。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None),
                    sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
    train(loader)

DistributedDataParallel()

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False)

DDP为了保证不同gpu上的模型能完成同步,需要进行较为繁琐的设置。
使用这个类要求你先对torch.distributed进行初始化,通过调用方法。

torch.distributed.init_process_group(backend=None, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

在DistributedDataParallel()中,第一个参数module是你想要并行话的module,在训练中也就是你的模型。
参数device_id和output_device代表你的moudle的CUDAdevice和你的output的CUDAdevice。
process_group如果为None,会默认使用你的torch.distributed.init_process_group。

给出一个简单的示例

torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
net = torch.nn.parallel.DistributedDataParallel(model)

为了让你的DDP能够在N个GPU上使用,你需要产生N个进程,并且保证每个进程独立的工作在一个GPU上。有两个方法可以完成这个设置。
第一个方法是:

torch.cuda.set_device(i)

在每个进程中,你需要进行以下的设置。

torch.distributed.init_process_group(
    backend='nccl', world_size=N, init_method='...'
)
model = DistributedDataParallel(model, device_ids=[i], output_device=i)

使用DDP

建议直接参考https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py。给出了一个完整的模版,可以直接套用。

代码示例

教程中给出了一个比较简单但完整的DDP示例。
在这个例子中,init_process_group被封装到一个setup函数里。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

然后定义一个作为例子的小模型。使用torch.multiprocess产生多个进程,rank表示当前的进程数,或者说CUDA id。world_size代表了gpu的个数。在每个进程中,将你的model放到对应的device上。
这个例子中没有加入dataset的部分。只有DDP中模型的使用。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

multiprocessing.spawn()

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

torch.multiprocessing基本上就是在python的multiprocessing上进行了一些改动。具体的不讲了【因为我也没有看】,我们只说一下spawn。

spawn的输入包括 fn:目标函数, args:函数的参数,nprocs:需要的进程数。

在使用这个方法后,multiprocessing会以‘spawn’的方法来创建新的进程。创建的数量由nprocs决定。

for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        process = mp.Process(
            target=_wrap,
            args=(fn, i, args, error_queue),
            daemon=daemon,
        )
        process.start()
        error_queues.append(error_queue)
        processes.append(process)

可以看到在下方代码的mp.Process中使用的target是一个名为_wrap的函数。这个函数把当前process对应的index和args组合在一起,作为输入传入到fn中去。所以理论上来讲,你定义的fn的传入参数数量至少为1个,也就是当前进程的index。而在使用torch中的mp.spawn()时,args里面你不能把这个参数写出来,而只需要写第二个以及以后的参数。

def _wrap(fn, i, args, error_queue):
    # prctl(2) is a Linux specific system call.
    # On other systems the following function call has no effect.
    # This is set to ensure that non-daemonic child processes can
    # terminate if their parent terminates before they do.
    _prctl_pr_set_pdeathsig(signal.SIGINT)

    try:
        fn(i, *args)  # index和args一起作为args传给了fn
    except KeyboardInterrupt:
        pass  # SIGINT; Killed by parent, do nothing
    except Exception:
        # Propagate exception to parent process, keeping original traceback
        import traceback
        error_queue.put(traceback.format_exc())
        sys.exit(1)

save and load checkpoints

模型的保存和加载方法和之前的使用上区别不大。只是加了一点限制。
在下方的示例代码中,在DDP中保存state_dict()时用的是model.module.state_dict()。确认了一下发现在ddp(model)的示例化时,model会被赋值给self.module。
但是有的例子中并没有使用module.state_dict()来完成保存。
这里为了防止重复保存model,限制了只在gpu_id == 0 时才进行模型的保存。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
   self._save_checkpoint(epoch)

一个优化方法是将模型的每次更新都保存在一个进程上然后被加载到所有进程中,这样可以防止多次写入。因为所有的进程都是从一样的参数开始的,并且bp中的梯度计算总是同步的,因此不同gpu上的optimizer也要保持同步。

make sure no process starts loading before the saving is finished。

在下面的例子中,在rank == 0的情况下才会进行model的保存。使用dist.barrier()限制了save期间的别的process对模型的加载。

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {
   
   'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
相关实践学习
在云上部署ChatGLM2-6B大模型(GPU版)
ChatGLM2-6B是由智谱AI及清华KEG实验室于2023年6月发布的中英双语对话开源大模型。通过本实验,可以学习如何配置AIGC开发环境,如何部署ChatGLM2-6B大模型。
相关文章
|
机器学习/深度学习 数据采集 自然语言处理
深度学习实践技巧:提升模型性能的详尽指南
深度学习模型在图像分类、自然语言处理、时间序列分析等多个领域都表现出了卓越的性能,但在实际应用中,为了使模型达到最佳效果,常规的标准流程往往不足。本文提供了多种深度学习实践技巧,包括数据预处理、模型设计优化、训练策略和评价与调参等方面的详细操作和代码示例,希望能够为应用实战提供有效的指导和支持。
|
机器学习/深度学习 传感器 数据采集
深度学习在故障检测中的应用:从理论到实践
深度学习在故障检测中的应用:从理论到实践
1272 6
|
机器学习/深度学习 人工智能 TensorFlow
人工智能浪潮下的自我修养:从Python编程入门到深度学习实践
【10月更文挑战第39天】本文旨在为初学者提供一条清晰的道路,从Python基础语法的掌握到深度学习领域的探索。我们将通过简明扼要的语言和实际代码示例,引导读者逐步构建起对人工智能技术的理解和应用能力。文章不仅涵盖Python编程的基础,还将深入探讨深度学习的核心概念、工具和实战技巧,帮助读者在AI的浪潮中找到自己的位置。
|
机器学习/深度学习 调度 计算机视觉
深度学习中的学习率调度:循环学习率、SGDR、1cycle 等方法介绍及实践策略研究
本文探讨了多种学习率调度策略在神经网络训练中的应用,强调了选择合适学习率的重要性。文章介绍了阶梯式衰减、余弦退火、循环学习率等策略,并分析了它们在不同实验设置下的表现。研究表明,循环学习率和SGDR等策略在提高模型性能和加快训练速度方面表现出色,而REX调度则在不同预算条件下表现稳定。这些策略为深度学习实践者提供了实用的指导。
793 2
深度学习中的学习率调度:循环学习率、SGDR、1cycle 等方法介绍及实践策略研究
|
机器学习/深度学习 人工智能 自然语言处理
揭秘人工智能:深度学习的奥秘与实践
在本文中,我们将深入浅出地探索深度学习的神秘面纱。从基础概念到实际应用,你将获得一份简明扼要的指南,助你理解并运用这一前沿技术。我们避开复杂的数学公式和冗长的论述,以直观的方式呈现深度学习的核心原理和应用实例。无论你是技术新手还是有经验的开发者,这篇文章都将为你打开一扇通往人工智能新世界的大门。
|
机器学习/深度学习 算法 TensorFlow
深度学习中的自编码器:从理论到实践
在这篇文章中,我们将深入探讨深度学习的一个重要分支——自编码器。自编码器是一种无监督学习算法,它可以学习数据的有效表示。我们将首先介绍自编码器的基本概念和工作原理,然后通过一个简单的Python代码示例来展示如何实现一个基本的自编码器。最后,我们将讨论自编码器的一些变体,如稀疏自编码器和降噪自编码器,以及它们在实际应用中的优势。
|
机器学习/深度学习 人工智能 自然语言处理
揭秘AI:深度学习的奥秘与实践
本文将深入浅出地探讨人工智能中的一个重要分支——深度学习。我们将从基础概念出发,逐步揭示深度学习的原理和工作机制。通过生动的比喻和实际代码示例,本文旨在帮助初学者理解并应用深度学习技术,开启AI之旅。
|
机器学习/深度学习 人工智能 自然语言处理
深入浅出深度学习:从理论到实践的探索之旅
在人工智能的璀璨星空中,深度学习如同一颗耀眼的新星,以其强大的数据处理能力引领着技术革新的浪潮。本文将带您走进深度学习的核心概念,揭示其背后的数学原理,并通过实际案例展示如何应用深度学习模型解决现实世界的问题。无论您是初学者还是有一定基础的开发者,这篇文章都将为您提供宝贵的知识和启发。
268 5
|
机器学习/深度学习 人工智能 自然语言处理
深度学习中的卷积神经网络(CNN): 从理论到实践
本文将深入浅出地介绍卷积神经网络(CNN)的工作原理,并带领读者通过一个简单的图像分类项目,实现从理论到代码的转变。我们将探索CNN如何识别和处理图像数据,并通过实例展示如何训练一个有效的CNN模型。无论你是深度学习领域的新手还是希望扩展你的技术栈,这篇文章都将为你提供宝贵的知识和技能。
1134 7
|
机器学习/深度学习 人工智能 自然语言处理
深度学习中的卷积神经网络:从理论到实践
【10月更文挑战第35天】在人工智能的浪潮中,深度学习技术以其强大的数据处理能力成为科技界的宠儿。其中,卷积神经网络(CNN)作为深度学习的一个重要分支,在图像识别和视频分析等领域展现出了惊人的潜力。本文将深入浅出地介绍CNN的工作原理,并结合实际代码示例,带领读者从零开始构建一个简单的CNN模型,探索其在图像分类任务中的应用。通过本文,读者不仅能够理解CNN背后的数学原理,还能学会如何利用现代深度学习框架实现自己的CNN模型。

热门文章

最新文章