PyTorch 并行训练 DistributedDataParallel完整代码示例

简介: 使用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习领域的主要挑战。 随着 DNN 和数据集规模的增加,训练这些模型的计算和内存需求也会增加。 这使得在计算资源有限的单台机器上训练这些模型变得困难甚至不可能。 使用大型数据集训练大型 DNN 的一些主要挑战包括:
  • 训练时间长:训练过程可能需要数周甚至数月才能完成,具体取决于模型的复杂性和数据集的大小。
  • 内存限制:大型 DNN 可能需要大量内存来存储训练期间的所有模型参数、梯度和中间激活。 这可能会导致内存不足错误并限制可在单台机器上训练的模型的大小。

为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。

我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。

下面我们用用ResNet50和CIFAR10数据集来进行完整的代码示例:

在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。

PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。

导入必须要的库

 importos
 fromdatetimeimportdatetime
 fromtimeimporttime
 importargparse
 importtorchvision
 importtorchvision.transformsastransforms
 importtorch
 importtorch.nnasnn
 importtorch.distributedasdist
 fromtorch.nn.parallelimportDistributedDataParallel

接下来,我们将检查GPU

 importsubprocess
 result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向Linux和Unix类似内核的免费和开源工作调度程序),

 defmain():
         
     # get distributed configuration from Slurm environment
     
     parser=argparse.ArgumentParser()
     parser.add_argument('-b', '--batch-size', default=128, type=int,
                         help='batch size. it will be divided in mini-batch for each worker')
     parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
                         help='number of total epochs to run')
     parser.add_argument('-c','--checkpoint', default=None, type=str,
                         help='path to checkpoint to load')
     args=parser.parse_args()
     
     rank=int(os.environ['SLURM_PROCID'])
     local_rank=int(os.environ['SLURM_LOCALID'])
     size=int(os.environ['SLURM_NTASKS'])
     master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
     port="29500"
     node_id=os.environ['SLURM_NODEID']
     ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
     train(args, ddp_arg)     

然后我们使用DistributedDataParallel 库来执行分布式训练。

 deftrain(args, ddp_arg):
     
     rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
     
     # display info
     ifrank==0:
         #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
         print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
     #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     
     print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     
     
     # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
     #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
     dist.init_process_group(
         backend='nccl',
         init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
         world_size=size,
         rank=rank
     )
     
     # distribute model
     torch.cuda.set_device(local_rank)
     gpu=torch.device("cuda")
     #model = ResNet18(classes=10).to(gpu)
     model=torchvision.models.resnet50(pretrained=False).to(gpu)
     ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
     ifargs.checkpointisnotNone:
         map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
         ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
     
     # distribute batch size (mini-batch)
     batch_size=args.batch_size
     batch_size_per_gpu=batch_size//size
     
     # define loss function (criterion) and optimizer
     criterion=nn.CrossEntropyLoss()  
     optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
     
     
     transform_train=transforms.Compose([
         transforms.RandomCrop(32, padding=4),
         transforms.RandomHorizontalFlip(),
         transforms.ToTensor(),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
     ])
 
     # load data with distributed sampler
     #train_dataset = torchvision.datasets.CIFAR10(root='./data',
     #                                           train=True,
     #                                           transform=transform_train,
     #                                           download=False)
     
     # load data with distributed sampler
     train_dataset=torchvision.datasets.CIFAR10(root='./data',
                                                train=True,
                                                transform=transform_train,
                                                download=False)
     
     train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                     num_replicas=size,
                                                                     rank=rank)
     
     train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
                                                batch_size=batch_size_per_gpu,
                                                shuffle=False,
                                                num_workers=0,
                                                pin_memory=True,
                                                sampler=train_sampler)
 
     # training (timers and display handled by process 0)
     ifrank==0: start=datetime.now()         
     total_step=len(train_loader)
     
     forepochinrange(args.epochs):
         ifrank==0: start_dataload=time()
         
         fori, (images, labels) inenumerate(train_loader):
             
             # distribution of images and labels to all GPUs
             images=images.to(gpu, non_blocking=True)
             labels=labels.to(gpu, non_blocking=True) 
             
             ifrank==0: stop_dataload=time()
 
             ifrank==0: start_training=time()
             
             # forward pass
             outputs=ddp_model(images)
             loss=criterion(outputs, labels)
 
             # backward and optimize
             optimizer.zero_grad()
             loss.backward()
             optimizer.step()
             
             ifrank==0: stop_training=time() 
             if (i+1) %10==0andrank==0:
                 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
                                                                         i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
                                                                         (stop_training-start_training)*1000))
             ifrank==0: start_dataload=time()
                     
         #Save checkpoint at every end of epoch
         ifrank==0:
             torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
     ifrank==0:
         print(">>> Training complete in: "+str(datetime.now() -start))
 
 
 if__name__=='__main__':
 
     main()

代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。下面是代码的一些解释:

train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。

如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。

torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了

加载CIFAR-10数据集并应用数据增强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler

为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。

rank0在每个轮次结束时保存一个检查点。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。

https://avoid.overfit.cn/post/67095b9014cb40888238b84fea17e872

作者:Joseph El Kettaneh

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
目录
相关文章
|
1月前
|
存储 物联网 PyTorch
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
**Torchtune**是由PyTorch团队开发的一个专门用于LLM微调的库。它旨在简化LLM的微调流程,提供了一系列高级API和预置的最佳实践
162 59
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
|
3月前
|
机器学习/深度学习 PyTorch 算法框架/工具
CNN中的注意力机制综合指南:从理论到Pytorch代码实现
注意力机制已成为深度学习模型的关键组件,尤其在卷积神经网络(CNN)中发挥了重要作用。通过使模型关注输入数据中最相关的部分,注意力机制显著提升了CNN在图像分类、目标检测和语义分割等任务中的表现。本文将详细介绍CNN中的注意力机制,包括其基本概念、不同类型(如通道注意力、空间注意力和混合注意力)以及实际实现方法。此外,还将探讨注意力机制在多个计算机视觉任务中的应用效果及其面临的挑战。无论是图像分类还是医学图像分析,注意力机制都能显著提升模型性能,并在不断发展的深度学习领域中扮演重要角色。
123 10
|
3月前
|
并行计算 PyTorch 算法框架/工具
基于CUDA12.1+CUDNN8.9+PYTORCH2.3.1,实现自定义数据集训练
文章介绍了如何在CUDA 12.1、CUDNN 8.9和PyTorch 2.3.1环境下实现自定义数据集的训练,包括环境配置、预览结果和核心步骤,以及遇到问题的解决方法和参考链接。
163 4
基于CUDA12.1+CUDNN8.9+PYTORCH2.3.1,实现自定义数据集训练
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
本文介绍了几种常用的计算机视觉注意力机制及其PyTorch实现,包括SENet、CBAM、BAM、ECA-Net、SA-Net、Polarized Self-Attention、Spatial Group-wise Enhance和Coordinate Attention等,每种方法都附有详细的网络结构说明和实验结果分析。通过这些注意力机制的应用,可以有效提升模型在目标检测任务上的性能。此外,作者还提供了实验数据集的基本情况及baseline模型的选择与实验结果,方便读者理解和复现。
64 0
聊一聊计算机视觉中常用的注意力机制以及Pytorch代码实现
|
4月前
|
机器学习/深度学习 并行计算 PyTorch
GPU 加速与 PyTorch:最大化硬件性能提升训练速度
【8月更文第29天】GPU(图形处理单元)因其并行计算能力而成为深度学习领域的重要组成部分。本文将介绍如何利用PyTorch来高效地利用GPU进行深度学习模型的训练,从而最大化训练速度。我们将讨论如何配置环境、选择合适的硬件、编写高效的代码以及利用高级特性来提高性能。
861 1
|
4月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
527 2
|
5月前
|
机器学习/深度学习 PyTorch 编译器
Pytorch的编译新特性TorchDynamo的工作原理和使用示例
PyTorch的TorchDynamo是一个即时编译器,用于优化动态图执行,提高运行效率。它在运行时分析和转换代码,应用优化技术,如操作符融合,然后编译成高效机器码。通过一个包含特征工程、超参数调整、交叉验证的合成数据集示例,展示了TorchDynamo如何减少训练时间并提高模型性能。它易于集成,只需对现有PyTorch代码进行小改动,即可利用其性能提升。TorchDynamo的优化包括动态捕获计算图、应用优化和编译,适用于实时应用和需要快速响应的场景。
92 11
|
5月前
|
资源调度 PyTorch 调度
多任务高斯过程数学原理和Pytorch实现示例
本文探讨了如何使用高斯过程扩展到多任务场景,强调了多任务高斯过程(MTGP)在处理相关输出时的优势。通过独立多任务GP、内在模型(ICM)和线性模型(LMC)的核心区域化方法,MTGP能够捕捉任务间的依赖关系,提高泛化能力。ICM和LMC通过引入核心区域化矩阵来学习任务间的共享结构。在PyTorch中,使用GPyTorch库展示了如何实现ICM模型,包括噪声建模和训练过程。实验比较了MTGP与独立GP,显示了MTGP在预测性能上的提升。
101 7
|
4月前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch代码实现神经网络
这段代码示例展示了如何在PyTorch中构建一个基础的卷积神经网络(CNN)。该网络包括两个卷积层,分别用于提取图像特征,每个卷积层后跟一个池化层以降低空间维度;之后是三个全连接层,用于分类输出。此结构适用于图像识别任务,并可根据具体应用调整参数与层数。
|
4月前
|
机器学习/深度学习 PyTorch 测试技术
深度学习入门:使用 PyTorch 构建和训练你的第一个神经网络
【8月更文第29天】深度学习是机器学习的一个分支,它利用多层非线性处理单元(即神经网络)来解决复杂的模式识别问题。PyTorch 是一个强大的深度学习框架,它提供了灵活的 API 和动态计算图,非常适合初学者和研究者使用。
57 0
下一篇
DataWorks