【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,100CU*H 3个月
简介: 本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.

直接使用

请打开如何在DLC中进行Pytorch DDP分布式训练任务,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Pytorch DDP分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以Pytorch分布式训练为例,讲述如何使用DLC进行Pytorch分布式训练。一般来说,分布式Pytorch的使用者主要关心下面3件事情:

  1. 是否有所需要的足够多的计算资源,Pytorch多机多卡的分布式训练主要采用 RingAllReduce的模式。 本文会以Pytorch DDP分布式训练为例,展示如何使用DLC进行Pytorch DDP分布式训练任务
  2. 安装和配置支撑程序运算的软件和应用
  3. 如何配置分布式训练任务的集群信息 针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。
  4. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  5. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  6. Pytorch DDP训练一般需要设定下列两个参数: WORLD_SIZE: 当前JOB的总进程数 RANK:当前进程的进程号

1. 如何获取并且引入这两个环境变量

import torch.distributed as dist
# 从环境变量中获取当前JOB的WORLD_SIZE
world_size = os.environ.get("WORLD_SIZE", "{}")
# 从环境变量中获取当前JOB的RANK
rank = os.environ.get("RANK", "{}")
# 将上面拿到的world_size和rank传入process group 
dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))

2. 基于Mnist数据集的Pytorch DDP训练代码

import datetime
import logging
import os
import argparse
from math import ceil
from random import Random
import torch
import torch.distributed as dist
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim 
import torch.utils.data
import torch.utils.data.distributed
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from torch.autograd import Variable
from torch.nn.modules import Module
from torchvision import datasets, transforms
gbatch_size = 128
epochs = 10
world_size = os.environ.get("WORLD_SIZE", "{}")
rank = os.environ.get("RANK", "{}")
class DistributedDataParallel(Module):
  def __init__(self, module):
    super(DistributedDataParallel, self).__init__()
    self.module = module
    self.first_call = True
    def allreduce_params():
      if self.needs_reduction:
        self.needs_reduction = False 
        buckets = {}
        for param in self.module.parameters():
          if param.requires_grad and param.grad is not None:
            tp = type(param.data)
            if tp not in buckets:
              buckets[tp] = []
            buckets[tp].append(param)
        for tp in buckets:
          bucket = buckets[tp]
          grads = [param.grad.data for param in bucket]
          coalesced = _flatten_dense_tensors(grads)
          dist.all_reduce(coalesced)
          coalesced /= dist.get_world_size()
          for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
            buf.copy_(synced)
    for param in list(self.module.parameters()):
      def allreduce_hook(*unused): 
        Variable._execution_engine.queue_callback(allreduce_params)  
      if param.requires_grad:
        param.register_hook(allreduce_hook)
  def weight_broadcast(self):
    for param in self.module.parameters():
      dist.broadcast(param.data, 0)
  def forward(self, *inputs, **kwargs):  
    if self.first_call:
      logging.info("first broadcast start")
      self.weight_broadcast()
      self.first_call = False
      logging.info("first broadcast done")
    self.needs_reduction = True  
    return self.module(*inputs, **kwargs)
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
  def forward(self, x): 
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x)
def partition_dataset(rank):
  dataset = datasets.MNIST(
    './data{}'.format(rank),
    train=True,
    download=True,
    transform=transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize((0.1307,), (0.3081,))
    ]))
  size = dist.get_world_size()
  bsz = int(gbatch_size / float(size))
  train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
  train_set = torch.utils.data.DataLoader(
    dataset, batch_size=bsz, shuffle=(train_sampler is None), sampler=train_sampler)
  return train_set, bsz
def average_gradients(model):
  size = float(dist.get_world_size())
  group = dist.new_group([0])
  for param in model.parameters():
    dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM, group=group)
    param.grad.data /= size
def run(gpu):
  rank = dist.get_rank()
  torch.manual_seed(1234)
  train_set, bsz = partition_dataset(rank)
  model = Net()
  if gpu:
    model = model.cuda()
    model = torch.nn.parallel.DistributedDataParallel(model)
  else:
    model = DistributedDataParallel(model)
  optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
  num_batches = ceil(len(train_set.dataset) / float(bsz))
  logging.info("num_batches = %s", num_batches)
  time_start = datetime.datetime.now()
  for epoch in range(epochs):
    epoch_loss = 0.0
    for data, target in train_set:
      if gpu:
        data, target = Variable(data).cuda(), Variable(target).cuda()
      else:
        data, target = Variable(data), Variable(target)
      optimizer.zero_grad()
      output = model(data)
      loss = F.nll_loss(output, target)
      epoch_loss += loss.item()
      loss.backward()
      average_gradients(model)
      optimizer.step()
    logging.info('Epoch {} Loss {:.6f} Global batch size {} on {} ranks'.format(
      epoch, epoch_loss / num_batches, gbatch_size, dist.get_world_size()))
  if gpu:
    logging.info("GPU training time= {}".format(  
      str(datetime.datetime.now() - time_start)))  
  else:
    logging.info("CPU training time= {}".format(  
      str(datetime.datetime.now() - time_start))) 
if __name__ == "__main__":
  logging.basicConfig(level=logging.INFO,
                      format=('%(levelname)s|%(asctime)s'
                              '|%(pathname)s|%(lineno)d| %(message)s'),
                      datefmt='%Y-%m-%dT%H:%M:%S',
                      )
  logging.getLogger().setLevel(logging.INFO)
  parser = argparse.ArgumentParser(description='Train Pytorch model using DDP')
  parser.add_argument('--gpu', action='store_true',
                      help='Use GPU and CUDA')
  parser.set_defaults(gpu=False)
  args = parser.parse_args()
  if args.gpu:
    logging.info("\n======= CUDA INFO =======")
    logging.info("CUDA Availibility: %s", torch.cuda.is_available())
    if torch.cuda.is_available():
      logging.info("CUDA Device Name: %s", torch.cuda.get_device_name(0))
      logging.info("CUDA Version: %s", torch.version.cuda)
    logging.info("=========================\n")
  dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))
  run(gpu=False)
  dist.destroy_process_group()

3. 如何在DLC中提交Pytorch 分布式训练任务

3.1 创建新的工作空间

可以参考: https://help.aliyun.com/document_detail/326193.html

3.2 进入工作空间,新增代码配置#

可以参考: https://help.aliyun.com/document_detail/202277.html

3.3 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

NAS: 这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  1. 数据存储类型:本例选择OSS
  2. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统

image.png

3.4 创建&&提交JOB

image.png

image.png

相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
1月前
|
存储 监控 算法
117_LLM训练的高效分布式策略:从数据并行到ZeRO优化
在2025年,大型语言模型(LLM)的规模已经达到了数千亿甚至数万亿参数,训练这样的庞然大物需要先进的分布式训练技术支持。本文将深入探讨LLM训练中的高效分布式策略,从基础的数据并行到最先进的ZeRO优化技术,为读者提供全面且实用的技术指南。
|
1月前
|
机器学习/深度学习 监控 PyTorch
68_分布式训练技术:DDP与Horovod
随着大型语言模型(LLM)规模的不断扩大,从早期的BERT(数亿参数)到如今的GPT-4(万亿级参数),单卡训练已经成为不可能完成的任务。分布式训练技术应运而生,成为大模型开发的核心基础设施。2025年,分布式训练技术已经发展到相当成熟的阶段,各种优化策略和框架不断涌现,为大模型训练提供了强大的支持。
|
4月前
|
机器学习/深度学习 人工智能 API
AI-Compass LLM训练框架生态:整合ms-swift、Unsloth、Megatron-LM等核心框架,涵盖全参数/PEFT训练与分布式优化
AI-Compass LLM训练框架生态:整合ms-swift、Unsloth、Megatron-LM等核心框架,涵盖全参数/PEFT训练与分布式优化
|
5月前
|
存储 机器学习/深度学习 自然语言处理
避坑指南:PAI-DLC分布式训练BERT模型的3大性能优化策略
本文基于电商搜索场景下的BERT-Large模型训练优化实践,针对数据供给、通信效率与计算资源利用率三大瓶颈,提出异步IO流水线、梯度压缩+拓扑感知、算子融合+混合精度等策略。实测在128卡V100集群上训练速度提升3.2倍,GPU利用率提升至89.3%,训练成本降低70%。适用于大规模分布式深度学习任务的性能调优。
231 2
|
2月前
|
机器学习/深度学习 数据采集 人工智能
PyTorch学习实战:AI从数学基础到模型优化全流程精解
本文系统讲解人工智能、机器学习与深度学习的层级关系,涵盖PyTorch环境配置、张量操作、数据预处理、神经网络基础及模型训练全流程,结合数学原理与代码实践,深入浅出地介绍激活函数、反向传播等核心概念,助力快速入门深度学习。
159 1
|
6月前
|
机器学习/深度学习 PyTorch API
PyTorch量化感知训练技术:模型压缩与高精度边缘部署实践
本文深入探讨神经网络模型量化技术,重点讲解训练后量化(PTQ)与量化感知训练(QAT)两种主流方法。PTQ通过校准数据集确定量化参数,快速实现模型压缩,但精度损失较大;QAT在训练中引入伪量化操作,使模型适应低精度环境,显著提升量化后性能。文章结合PyTorch实现细节,介绍Eager模式、FX图模式及PyTorch 2导出量化等工具,并分享大语言模型Int4/Int8混合精度实践。最后总结量化最佳策略,包括逐通道量化、混合精度设置及目标硬件适配,助力高效部署深度学习模型。
896 21
PyTorch量化感知训练技术:模型压缩与高精度边缘部署实践
|
1月前
|
边缘计算 人工智能 PyTorch
130_知识蒸馏技术:温度参数与损失函数设计 - 教师-学生模型的优化策略与PyTorch实现
随着大型语言模型(LLM)的规模不断增长,部署这些模型面临着巨大的计算和资源挑战。以DeepSeek-R1为例,其671B参数的规模即使经过INT4量化后,仍需要至少6张高端GPU才能运行,这对于大多数中小型企业和研究机构来说成本过高。知识蒸馏作为一种有效的模型压缩技术,通过将大型教师模型的知识迁移到小型学生模型中,在显著降低模型复杂度的同时保留核心性能,成为解决这一问题的关键技术之一。
|
2月前
|
机器学习/深度学习 存储 PyTorch
Neural ODE原理与PyTorch实现:深度学习模型的自适应深度调节
Neural ODE将神经网络与微分方程结合,用连续思维建模数据演化,突破传统离散层的限制,实现自适应深度与高效连续学习。
120 3
Neural ODE原理与PyTorch实现:深度学习模型的自适应深度调节
|
8月前
|
机器学习/深度学习 JavaScript PyTorch
9个主流GAN损失函数的数学原理和Pytorch代码实现:从经典模型到现代变体
生成对抗网络(GAN)的训练效果高度依赖于损失函数的选择。本文介绍了经典GAN损失函数理论,并用PyTorch实现多种变体,包括原始GAN、LS-GAN、WGAN及WGAN-GP等。通过分析其原理与优劣,如LS-GAN提升训练稳定性、WGAN-GP改善图像质量,展示了不同场景下损失函数的设计思路。代码实现覆盖生成器与判别器的核心逻辑,为实际应用提供了重要参考。未来可探索组合优化与自适应设计以提升性能。
645 7
9个主流GAN损失函数的数学原理和Pytorch代码实现:从经典模型到现代变体
|
3月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.0性能优化实战:4种常见代码错误严重拖慢模型
我们将深入探讨图中断(graph breaks)和多图问题对性能的负面影响,并分析PyTorch模型开发中应当避免的常见错误模式。
219 9

热门文章

最新文章

推荐镜像

更多