阿里云DLC运行DDP Sample

简介: PAI提供的云原生基础AI平台,提供灵活、稳定、易用和高性能的机器学习训练环境。该平台支持多种算法框架、超大规模分布式深度学习任务运行及自定义算法框架。本文演示如何在DLC上面运行Pytorch DDP任务。

一、Code Sample


from __future__ import print_function
import os
import time
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import torch.distributed as dist

def init_distributed_mode(args):
    '''initilize DDP
    '''
    if "RANK" in os.environ and "WORLD_SIZE" in os.environ:
        args.rank = int(os.environ["RANK"])
        args.world_size = int(os.environ["WORLD_SIZE"])
        args.gpu = int(os.environ["LOCAL_RANK"])
    elif "SLURM_PROCID" in os.environ:
        args.rank = int(os.environ["SLURM_PROCID"])
        args.gpu = args.rank % torch.cuda.device_count()
    elif hasattr(args, "rank"):
        pass
    else:
        print("Not using distributed mode")
        args.distributed = False
        return

    args.distributed = True

    torch.cuda.set_device(args.gpu)
    args.dist_backend = "nccl"
    # args.dist_backend = "gloo"
    print(f"| distributed init (rank {args.rank}): {args.dist_url}, local rank:{args.gpu}, world size:{args.world_size}", flush=True)
    dist.init_process_group(
        backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank
    )

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if args.distributed:
            if dist.get_rank() == 0:
                if batch_idx % args.log_interval == 0:
                    print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                         epoch, dist.get_world_size() * batch_idx * len(data), len(train_loader.dataset),
                         100. * batch_idx / len(train_loader), loss.item()))
        else:
            if batch_idx % args.log_interval == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                       epoch,  batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))
        if args.dry_run:
            break


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=14, metavar='N',
                        help='number of epochs to train (default: 14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='learning rate (default: 1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='Learning rate step gamma (default: 0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--dry-run', action='store_true', default=False,
                        help='quickly check a single pass')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--save-model', action='store_true', default=False,
                        help='For Saving the current Model')

    parser.add_argument('--local_rank', type=int, help='local rank, will passed by ddp')
    parser.add_argument("--world-size", default=1, type=int, help="number of distributed processes")
    parser.add_argument("--dist-url", default="env://", type=str, help="url used to set up distributed training")
    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()

    init_distributed_mode(args)

    torch.manual_seed(args.seed)

    device = torch.device("cuda" if use_cuda else "cpu")

    train_kwargs = {'batch_size': args.batch_size}
    test_kwargs = {'batch_size': args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {'num_workers': 1,
                       'pin_memory': True,
                       }
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    dataset_train = datasets.MNIST('./data', train=True, download=True,
                       transform=transform)
    dataset_val = datasets.MNIST('./data', train=False,
                       transform=transform)
    if args.distributed:
        train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, shuffle=True)
    else:
        train_sampler = torch.utils.data.RandomSampler(dataset_train)
    test_sampler = torch.utils.data.SequentialSampler(dataset_val)

    train_loader = torch.utils.data.DataLoader(dataset_train, sampler = train_sampler, **train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset_val, sampler = test_sampler, **test_kwargs)

    model = Net().to(device)
    model_without_ddp = model
    if args.distributed:
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
        model_without_ddp = model.module

    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        if args.distributed:
            train_sampler.set_epoch(epoch)
        train(args, model, device, train_loader, optimizer, epoch)
        if args.distributed:
            # Only run validation on GPU 0 process, for simplity, so we do not run validation on multi gpu.
            if dist.get_rank() == 0:
                test(model_without_ddp, device, test_loader)
        else:
            test(model, device, test_loader)
        scheduler.step()

    if args.save_model:
        if args.distributed:
            if dist.get_rank() == 0:
                # only save model on GPU0 process.
                torch.save(model.state_dict(), f"mnist_cnn.pt")
        else:
            torch.save(model.state_dict(), f"mnist_cnn_.pt")


if __name__ == '__main__':
    start = time.time()
    main()
    print(f'Total cost time:{time.time() - start} ms')

二、将python文件上传的oss

如果有配置数据源nas等,也可以直接将文件上传到nas中

三、创建任务

图片.png

图片.png

  • 启动脚本示例
wget https://********.oss-cn-hangzhou.aliyuncs.com/mnist-ddp.py &&   python -m torch.distributed.launch --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --nproc_per_node=1 --nnodes=$WORLD_SIZE --node_rank=$RANK mnist-ddp.py

四、察看运行结果

图片.png

图片.png

参考链接

提交分布式DLC任务

相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
4月前
|
人工智能 数据可视化 机器人
2026年萌新零基础部署OpenClaw(Clawdbot)接入钉钉保姆级教程
在2026年AI自动化办公爆发的时代,OpenClaw(原Clawdbot,曾用名Moltbot)作为阿里云生态下开源的AI自动化代理工具,凭借自然语言交互、全场景任务自动化、插件化扩展的核心优势,成为个人办公提效、企业轻量化数字化转型的核心选择。与传统聊天机器人不同,OpenClaw并非单纯的对话工具,而是能实现“需求解析-任务规划-工具调用-结果反馈”的完整自动化系统,可轻松完成邮件管理、日程规划、网页抓取、多平台协同等实操任务,真正实现解放双手、提升效率的核心需求。
1428 25
|
人工智能 自然语言处理 API
阿里云百炼产品月刊【2025年3月】
2025年3月的阿⾥云百炼平台月刊突出展示了其在AI模型和服务上的显著进展。本期亮点包括推出了多个先进的多模态模型,如qwen2.5-omni-7b和视觉推理模型qvq-max系列,大幅提升了文本、图像、语音和视频的处理能力,并降低了计算成本。此外,平台引入了精准的语音识别和翻译模型gummy-realtime-v1及gummy-chat-v1,支持多语言实时交互。为了促进应用开发,阿里云百炼平台还发布了开源推理模型qwq-32b,以及一系列优化的智能体应用模型,增强了自动化和交互性。最后,通过新增周边查询插件和基于MCP的析⾔服务,进一步扩展了平台的功能和服务范围。
1588 8
|
4月前
|
人工智能 自然语言处理 监控
2026年零基础部署OpenClaw(Clawdbot)教程及OpenClaw必装Skill+实战案例分享
2026年初,一款名为OpenClaw(原Clawdbot、Moltbot)的AI工具以现象级速度席卷全球科技圈,凭借72小时狂揽60000+GitHub Stars的成绩,如今星标数已突破180000+,不仅让Mac Mini全球卖断货,更带动Cloudflare股价上涨20%。这款被网友亲切称为Molty“小龙虾”的工具,并非普通的聊天机器人,而是真正“长了手的AI助理”——它能通过Telegram、飞书等10+渠道主动执行任务,从网站重建、买车砍价到深夜Bug修复,真正实现“聊天框里办大事”。
2020 4
|
自然语言处理 算法 API
阿里云增值税发票识别NET Rest API调用示例
本文介绍了使用NET代码调用阿里云增值税发票识别API的实现方式。通过示例代码,详细展示了如何构造请求、设置签名以及发送HTTP请求的具体步骤。代码中涵盖了请求参数的处理、签名生成逻辑(如HMAC-SHA256算法)以及调用API后的结果处理。此外,还提供了运行结果的截图和参考文档链接,帮助开发者更好地理解和应用该接口。
1408 4
|
分布式计算 DataWorks 数据处理
"DataWorks高级技巧揭秘:手把手教你如何在PyODPS节点中将模型一键写入OSS,实现数据处理的完美闭环!"
【10月更文挑战第23天】DataWorks是企业级的云数据开发管理平台,支持强大的数据处理和分析功能。通过PyODPS节点,用户可以编写Python代码执行ODPS任务。本文介绍了如何在DataWorks中训练模型并将其保存到OSS的详细步骤和示例代码,包括初始化ODPS和OSS服务、读取数据、训练模型、保存模型到OSS等关键步骤。
951 3
|
Java 编译器
Java 泛型详细解析
本文将带你详细解析 Java 泛型,了解泛型的原理、常见的使用方法以及泛型的局限性,让你对泛型有更深入的了解。
573 2
Java 泛型详细解析
|
弹性计算 应用服务中间件 定位技术
基于地理位置的访问策略的GA加速最佳实践
全球加速GA是阿里云提供的全球网络加速服务,支持基于地理位置的访问策略。本文介绍如何通过多组GA实例组合,实现一个域名在全球多个区域的服务同步加速。具体步骤包括创建ECS实例、部署Nginx服务器、配置GA及全局流量管理器等。
653 4
|
弹性计算 应用服务中间件 定位技术
阿里云基于Anycast弹性公网IP实现多源站的就近访问加速
本文介绍了如何使用阿里云Anycast弹性公网IP实现基于地理位置的访问策略,通过在不同地区部署ECS服务器并绑定Anycast实例,实现就近加速访问。具体步骤包括创建ECS、创建Anycast实例、绑定资源和测试效果。
621 1
|
设计模式 前端开发 数据可视化
LiveCharts2:简单灵活交互式且功能强大的.NET图表库
LiveCharts2:简单灵活交互式且功能强大的.NET图表库
1416 0

热门文章

最新文章