【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,100CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
简介: 本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.

直接使用

请打开如何在DLC中进行Pytorch DDP分布式训练任务,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Pytorch DDP分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以Pytorch分布式训练为例,讲述如何使用DLC进行Pytorch分布式训练。一般来说,分布式Pytorch的使用者主要关心下面3件事情:

  1. 是否有所需要的足够多的计算资源,Pytorch多机多卡的分布式训练主要采用 RingAllReduce的模式。 本文会以Pytorch DDP分布式训练为例,展示如何使用DLC进行Pytorch DDP分布式训练任务
  2. 安装和配置支撑程序运算的软件和应用
  3. 如何配置分布式训练任务的集群信息 针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。
  4. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  5. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  6. Pytorch DDP训练一般需要设定下列两个参数: WORLD_SIZE: 当前JOB的总进程数 RANK:当前进程的进程号

1. 如何获取并且引入这两个环境变量

import torch.distributed as dist
# 从环境变量中获取当前JOB的WORLD_SIZE
world_size = os.environ.get("WORLD_SIZE", "{}")
# 从环境变量中获取当前JOB的RANK
rank = os.environ.get("RANK", "{}")
# 将上面拿到的world_size和rank传入process group 
dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))

2. 基于Mnist数据集的Pytorch DDP训练代码

import datetime
import logging
import os
import argparse
from math import ceil
from random import Random
import torch
import torch.distributed as dist
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim 
import torch.utils.data
import torch.utils.data.distributed
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from torch.autograd import Variable
from torch.nn.modules import Module
from torchvision import datasets, transforms
gbatch_size = 128
epochs = 10
world_size = os.environ.get("WORLD_SIZE", "{}")
rank = os.environ.get("RANK", "{}")
class DistributedDataParallel(Module):
  def __init__(self, module):
    super(DistributedDataParallel, self).__init__()
    self.module = module
    self.first_call = True
    def allreduce_params():
      if self.needs_reduction:
        self.needs_reduction = False 
        buckets = {}
        for param in self.module.parameters():
          if param.requires_grad and param.grad is not None:
            tp = type(param.data)
            if tp not in buckets:
              buckets[tp] = []
            buckets[tp].append(param)
        for tp in buckets:
          bucket = buckets[tp]
          grads = [param.grad.data for param in bucket]
          coalesced = _flatten_dense_tensors(grads)
          dist.all_reduce(coalesced)
          coalesced /= dist.get_world_size()
          for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
            buf.copy_(synced)
    for param in list(self.module.parameters()):
      def allreduce_hook(*unused): 
        Variable._execution_engine.queue_callback(allreduce_params)  
      if param.requires_grad:
        param.register_hook(allreduce_hook)
  def weight_broadcast(self):
    for param in self.module.parameters():
      dist.broadcast(param.data, 0)
  def forward(self, *inputs, **kwargs):  
    if self.first_call:
      logging.info("first broadcast start")
      self.weight_broadcast()
      self.first_call = False
      logging.info("first broadcast done")
    self.needs_reduction = True  
    return self.module(*inputs, **kwargs)
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
  def forward(self, x): 
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x)
def partition_dataset(rank):
  dataset = datasets.MNIST(
    './data{}'.format(rank),
    train=True,
    download=True,
    transform=transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize((0.1307,), (0.3081,))
    ]))
  size = dist.get_world_size()
  bsz = int(gbatch_size / float(size))
  train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
  train_set = torch.utils.data.DataLoader(
    dataset, batch_size=bsz, shuffle=(train_sampler is None), sampler=train_sampler)
  return train_set, bsz
def average_gradients(model):
  size = float(dist.get_world_size())
  group = dist.new_group([0])
  for param in model.parameters():
    dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM, group=group)
    param.grad.data /= size
def run(gpu):
  rank = dist.get_rank()
  torch.manual_seed(1234)
  train_set, bsz = partition_dataset(rank)
  model = Net()
  if gpu:
    model = model.cuda()
    model = torch.nn.parallel.DistributedDataParallel(model)
  else:
    model = DistributedDataParallel(model)
  optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
  num_batches = ceil(len(train_set.dataset) / float(bsz))
  logging.info("num_batches = %s", num_batches)
  time_start = datetime.datetime.now()
  for epoch in range(epochs):
    epoch_loss = 0.0
    for data, target in train_set:
      if gpu:
        data, target = Variable(data).cuda(), Variable(target).cuda()
      else:
        data, target = Variable(data), Variable(target)
      optimizer.zero_grad()
      output = model(data)
      loss = F.nll_loss(output, target)
      epoch_loss += loss.item()
      loss.backward()
      average_gradients(model)
      optimizer.step()
    logging.info('Epoch {} Loss {:.6f} Global batch size {} on {} ranks'.format(
      epoch, epoch_loss / num_batches, gbatch_size, dist.get_world_size()))
  if gpu:
    logging.info("GPU training time= {}".format(  
      str(datetime.datetime.now() - time_start)))  
  else:
    logging.info("CPU training time= {}".format(  
      str(datetime.datetime.now() - time_start))) 
if __name__ == "__main__":
  logging.basicConfig(level=logging.INFO,
                      format=('%(levelname)s|%(asctime)s'
                              '|%(pathname)s|%(lineno)d| %(message)s'),
                      datefmt='%Y-%m-%dT%H:%M:%S',
                      )
  logging.getLogger().setLevel(logging.INFO)
  parser = argparse.ArgumentParser(description='Train Pytorch model using DDP')
  parser.add_argument('--gpu', action='store_true',
                      help='Use GPU and CUDA')
  parser.set_defaults(gpu=False)
  args = parser.parse_args()
  if args.gpu:
    logging.info("\n======= CUDA INFO =======")
    logging.info("CUDA Availibility: %s", torch.cuda.is_available())
    if torch.cuda.is_available():
      logging.info("CUDA Device Name: %s", torch.cuda.get_device_name(0))
      logging.info("CUDA Version: %s", torch.version.cuda)
    logging.info("=========================\n")
  dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))
  run(gpu=False)
  dist.destroy_process_group()

3. 如何在DLC中提交Pytorch 分布式训练任务

3.1 创建新的工作空间

可以参考: https://help.aliyun.com/document_detail/326193.html

3.2 进入工作空间,新增代码配置#

可以参考: https://help.aliyun.com/document_detail/202277.html

3.3 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

NAS: 这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  1. 数据存储类型:本例选择OSS
  2. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统

image.png

3.4 创建&&提交JOB

image.png

image.png

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
目录
打赏
0
0
0
0
3569
分享
相关文章
体验用分布式数据库突破资源瓶颈,完成任务领智能台灯!
体验用分布式数据库突破资源瓶颈,完成任务领智能台灯!
DeepRec Extension 打造稳定高效的分布式训练
DeepRec Extension 打造稳定高效的分布式训练
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
DeepSeek进阶开发与应用4:DeepSeek中的分布式训练技术
随着深度学习模型和数据集规模的扩大,单机训练已无法满足需求,分布式训练技术应运而生。DeepSeek框架支持数据并行和模型并行两种模式,通过将计算任务分配到多个节点上并行执行,显著提高训练效率。本文介绍DeepSeek中的分布式训练技术,包括配置与启动方法,帮助用户轻松实现大规模模型训练。数据并行通过`MirroredStrategy`同步梯度,适用于大多数模型;模型并行则通过`ParameterServerStrategy`异步处理大模型。DeepSeek简化了分布式环境配置,支持单机多卡和多机多卡等场景。
分布式大模型训练的性能建模与调优
阿里云智能集团弹性计算高级技术专家林立翔分享了分布式大模型训练的性能建模与调优。内容涵盖四大方面:1) 大模型对AI基础设施的性能挑战,强调规模增大带来的显存和算力需求;2) 大模型训练的性能分析和建模,介绍TOP-DOWN和bottom-up方法论及工具;3) 基于建模分析的性能优化,通过案例展示显存预估和流水线失衡优化;4) 宣传阿里云AI基础设施,提供高效算力集群、网络及软件支持,助力大模型训练与推理。
基于昇腾用PyTorch实现传统CTR模型WideDeep网络
本文介绍了如何在昇腾平台上使用PyTorch实现经典的WideDeep网络模型,以处理推荐系统中的点击率(CTR)预测问题。
211 66
用PyTorch从零构建 DeepSeek R1:模型架构和分步训练详解
本文详细介绍了DeepSeek R1模型的构建过程,涵盖从基础模型选型到多阶段训练流程,再到关键技术如强化学习、拒绝采样和知识蒸馏的应用。
99 3
用PyTorch从零构建 DeepSeek R1:模型架构和分步训练详解
Pytorch学习笔记(九):Pytorch模型的FLOPs、模型参数量等信息输出(torchstat、thop、ptflops、torchsummary)
本文介绍了如何使用torchstat、thop、ptflops和torchsummary等工具来计算Pytorch模型的FLOPs、模型参数量等信息。
694 2
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
本文探讨了Transformer模型中变长输入序列的优化策略,旨在解决深度学习中常见的计算效率问题。文章首先介绍了批处理变长输入的技术挑战,特别是填充方法导致的资源浪费。随后,提出了多种优化技术,包括动态填充、PyTorch NestedTensors、FlashAttention2和XFormers的memory_efficient_attention。这些技术通过减少冗余计算、优化内存管理和改进计算模式,显著提升了模型的性能。实验结果显示,使用FlashAttention2和无填充策略的组合可以将步骤时间减少至323毫秒,相比未优化版本提升了约2.5倍。
111 3
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等