【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,5000CU*H 3个月
简介: 本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.

直接使用

请打开如何在DLC中进行Pytorch DDP分布式训练任务,并点击右上角 “ 在DSW中打开” 。

image.png

如何使用DLC进行Pytorch DDP分布式训练

基于云原生深度学习训练平台DLC可以进行分布式训练,本文以Pytorch分布式训练为例,讲述如何使用DLC进行Pytorch分布式训练。一般来说,分布式Pytorch的使用者主要关心下面3件事情:

  1. 是否有所需要的足够多的计算资源,Pytorch多机多卡的分布式训练主要采用 RingAllReduce的模式。 本文会以Pytorch DDP分布式训练为例,展示如何使用DLC进行Pytorch DDP分布式训练任务
  2. 安装和配置支撑程序运算的软件和应用
  3. 如何配置分布式训练任务的集群信息 针对上面的三个点,DLC都相应的给出了很便捷,基于云原生的解决方案。
  4. DLC是云原生的深度学习训练平台,所以基于k8s的调度能力和云的资源,可以很好的实现CPU/GPU的按需高效调度
  5. 第二点实际上是运行环境,这一点Docker完美契合这个场景
  6. Pytorch DDP训练一般需要设定下列两个参数: WORLD_SIZE: 当前JOB的总进程数 RANK:当前进程的进程号

1. 如何获取并且引入这两个环境变量

import torch.distributed as dist
# 从环境变量中获取当前JOB的WORLD_SIZE
world_size = os.environ.get("WORLD_SIZE", "{}")
# 从环境变量中获取当前JOB的RANK
rank = os.environ.get("RANK", "{}")
# 将上面拿到的world_size和rank传入process group 
dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))

2. 基于Mnist数据集的Pytorch DDP训练代码

import datetime
import logging
import os
import argparse
from math import ceil
from random import Random
import torch
import torch.distributed as dist
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim 
import torch.utils.data
import torch.utils.data.distributed
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from torch.autograd import Variable
from torch.nn.modules import Module
from torchvision import datasets, transforms
gbatch_size = 128
epochs = 10
world_size = os.environ.get("WORLD_SIZE", "{}")
rank = os.environ.get("RANK", "{}")
class DistributedDataParallel(Module):
  def __init__(self, module):
    super(DistributedDataParallel, self).__init__()
    self.module = module
    self.first_call = True
    def allreduce_params():
      if self.needs_reduction:
        self.needs_reduction = False 
        buckets = {}
        for param in self.module.parameters():
          if param.requires_grad and param.grad is not None:
            tp = type(param.data)
            if tp not in buckets:
              buckets[tp] = []
            buckets[tp].append(param)
        for tp in buckets:
          bucket = buckets[tp]
          grads = [param.grad.data for param in bucket]
          coalesced = _flatten_dense_tensors(grads)
          dist.all_reduce(coalesced)
          coalesced /= dist.get_world_size()
          for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
            buf.copy_(synced)
    for param in list(self.module.parameters()):
      def allreduce_hook(*unused): 
        Variable._execution_engine.queue_callback(allreduce_params)  
      if param.requires_grad:
        param.register_hook(allreduce_hook)
  def weight_broadcast(self):
    for param in self.module.parameters():
      dist.broadcast(param.data, 0)
  def forward(self, *inputs, **kwargs):  
    if self.first_call:
      logging.info("first broadcast start")
      self.weight_broadcast()
      self.first_call = False
      logging.info("first broadcast done")
    self.needs_reduction = True  
    return self.module(*inputs, **kwargs)
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
  def forward(self, x): 
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x)
def partition_dataset(rank):
  dataset = datasets.MNIST(
    './data{}'.format(rank),
    train=True,
    download=True,
    transform=transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize((0.1307,), (0.3081,))
    ]))
  size = dist.get_world_size()
  bsz = int(gbatch_size / float(size))
  train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
  train_set = torch.utils.data.DataLoader(
    dataset, batch_size=bsz, shuffle=(train_sampler is None), sampler=train_sampler)
  return train_set, bsz
def average_gradients(model):
  size = float(dist.get_world_size())
  group = dist.new_group([0])
  for param in model.parameters():
    dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM, group=group)
    param.grad.data /= size
def run(gpu):
  rank = dist.get_rank()
  torch.manual_seed(1234)
  train_set, bsz = partition_dataset(rank)
  model = Net()
  if gpu:
    model = model.cuda()
    model = torch.nn.parallel.DistributedDataParallel(model)
  else:
    model = DistributedDataParallel(model)
  optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
  num_batches = ceil(len(train_set.dataset) / float(bsz))
  logging.info("num_batches = %s", num_batches)
  time_start = datetime.datetime.now()
  for epoch in range(epochs):
    epoch_loss = 0.0
    for data, target in train_set:
      if gpu:
        data, target = Variable(data).cuda(), Variable(target).cuda()
      else:
        data, target = Variable(data), Variable(target)
      optimizer.zero_grad()
      output = model(data)
      loss = F.nll_loss(output, target)
      epoch_loss += loss.item()
      loss.backward()
      average_gradients(model)
      optimizer.step()
    logging.info('Epoch {} Loss {:.6f} Global batch size {} on {} ranks'.format(
      epoch, epoch_loss / num_batches, gbatch_size, dist.get_world_size()))
  if gpu:
    logging.info("GPU training time= {}".format(  
      str(datetime.datetime.now() - time_start)))  
  else:
    logging.info("CPU training time= {}".format(  
      str(datetime.datetime.now() - time_start))) 
if __name__ == "__main__":
  logging.basicConfig(level=logging.INFO,
                      format=('%(levelname)s|%(asctime)s'
                              '|%(pathname)s|%(lineno)d| %(message)s'),
                      datefmt='%Y-%m-%dT%H:%M:%S',
                      )
  logging.getLogger().setLevel(logging.INFO)
  parser = argparse.ArgumentParser(description='Train Pytorch model using DDP')
  parser.add_argument('--gpu', action='store_true',
                      help='Use GPU and CUDA')
  parser.set_defaults(gpu=False)
  args = parser.parse_args()
  if args.gpu:
    logging.info("\n======= CUDA INFO =======")
    logging.info("CUDA Availibility: %s", torch.cuda.is_available())
    if torch.cuda.is_available():
      logging.info("CUDA Device Name: %s", torch.cuda.get_device_name(0))
      logging.info("CUDA Version: %s", torch.version.cuda)
    logging.info("=========================\n")
  dist.init_process_group(backend='gloo', init_method='env://',world_size=int(world_size),rank=int(rank))
  run(gpu=False)
  dist.destroy_process_group()

3. 如何在DLC中提交Pytorch 分布式训练任务

3.1 创建新的工作空间

可以参考: https://help.aliyun.com/document_detail/326193.html

3.2 进入工作空间,新增代码配置#

可以参考: https://help.aliyun.com/document_detail/202277.html

3.3 创建数据集配置

目前工作空间中支持下列四种数据集(如下图): 推荐使用阿里云存储,这样无论是性能还是可靠性都有保障

image.png

NAS: 这里有三个参数:

  1. 数据存储类型:本例选择NAS
  2. 选择要挂载的NAS文件系统的ID,这里会有一个列表,列出当前用户所有的NAS文件系统
  3. 挂载路径:这是指定要挂载的文件系统的目录,本例中挂载文件系统的根目录
  4. image.png
  5. OSS:
  1. 数据存储类型:本例选择OSS
  2. 路径: 要挂载的文件系统的路径,可以点击红色框中的按钮选择当前用户所有的OSS文件系统

image.png

3.4 创建&&提交JOB

image.png

image.png

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
16天前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
62 3
|
18天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
20天前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
43 2
|
3月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
88 0
|
1月前
|
算法 PyTorch 算法框架/工具
Pytorch学习笔记(九):Pytorch模型的FLOPs、模型参数量等信息输出(torchstat、thop、ptflops、torchsummary)
本文介绍了如何使用torchstat、thop、ptflops和torchsummary等工具来计算Pytorch模型的FLOPs、模型参数量等信息。
188 2
|
1月前
|
机器学习/深度学习 自然语言处理 监控
利用 PyTorch Lightning 搭建一个文本分类模型
利用 PyTorch Lightning 搭建一个文本分类模型
57 8
利用 PyTorch Lightning 搭建一个文本分类模型
|
1月前
|
机器学习/深度学习 自然语言处理 数据建模
三种Transformer模型中的注意力机制介绍及Pytorch实现:从自注意力到因果自注意力
本文深入探讨了Transformer模型中的三种关键注意力机制:自注意力、交叉注意力和因果自注意力,这些机制是GPT-4、Llama等大型语言模型的核心。文章不仅讲解了理论概念,还通过Python和PyTorch从零开始实现这些机制,帮助读者深入理解其内部工作原理。自注意力机制通过整合上下文信息增强了输入嵌入,多头注意力则通过多个并行的注意力头捕捉不同类型的依赖关系。交叉注意力则允许模型在两个不同输入序列间传递信息,适用于机器翻译和图像描述等任务。因果自注意力确保模型在生成文本时仅考虑先前的上下文,适用于解码器风格的模型。通过本文的详细解析和代码实现,读者可以全面掌握这些机制的应用潜力。
58 3
三种Transformer模型中的注意力机制介绍及Pytorch实现:从自注意力到因果自注意力
|
2月前
|
机器学习/深度学习 PyTorch 调度
在Pytorch中为不同层设置不同学习率来提升性能,优化深度学习模型
在深度学习中,学习率作为关键超参数对模型收敛速度和性能至关重要。传统方法采用统一学习率,但研究表明为不同层设置差异化学习率能显著提升性能。本文探讨了这一策略的理论基础及PyTorch实现方法,包括模型定义、参数分组、优化器配置及训练流程。通过示例展示了如何为ResNet18设置不同层的学习率,并介绍了渐进式解冻和层适应学习率等高级技巧,帮助研究者更好地优化模型训练。
141 4
在Pytorch中为不同层设置不同学习率来提升性能,优化深度学习模型
|
2月前
|
机器学习/深度学习 监控 PyTorch
PyTorch 模型调试与故障排除指南
在深度学习领域,PyTorch 成为开发和训练神经网络的主要框架之一。本文为 PyTorch 开发者提供全面的调试指南,涵盖从基础概念到高级技术的内容。目标读者包括初学者、中级开发者和高级工程师。本文探讨常见问题及解决方案,帮助读者理解 PyTorch 的核心概念、掌握调试策略、识别性能瓶颈,并通过实际案例获得实践经验。无论是在构建简单神经网络还是复杂模型,本文都将提供宝贵的洞察和实用技巧,帮助开发者更高效地开发和优化 PyTorch 模型。
43 3
PyTorch 模型调试与故障排除指南
|
1月前
|
存储 并行计算 PyTorch
探索PyTorch:模型的定义和保存方法
探索PyTorch:模型的定义和保存方法