PyTorch 2.2 中文官方教程(十九)(1)

简介: PyTorch 2.2 中文官方教程(十九)

使用 RPC 进行分布式管道并行

原文:pytorch.org/tutorials/intermediate/dist_pipeline_parallel_tutorial.html

译者:飞龙

协议:CC BY-NC-SA 4.0

作者:Shen Li

注意

github中查看并编辑本教程。

先决条件:

本教程使用 Resnet50 模型演示了如何使用torch.distributed.rpc API 实现分布式管道并行。这可以看作是单机模型并行最佳实践中讨论的多 GPU 管道并行的分布式对应。

注意

本教程要求使用 PyTorch v1.6.0 或更高版本。

注意

本教程的完整源代码可以在pytorch/examples找到。

基础知识

之前的教程开始使用分布式 RPC 框架展示了如何使用torch.distributed.rpc为 RNN 模型实现分布式模型并行。该教程使用一个 GPU 来托管EmbeddingTable,提供的代码可以正常工作。但是,如果一个模型存在于多个 GPU 上,就需要一些额外的步骤来增加所有 GPU 的摊销利用率。管道并行是一种可以在这种情况下有所帮助的范式之一。

在本教程中,我们以ResNet50作为示例模型,该模型也被单机模型并行最佳实践教程使用。类似地,ResNet50模型被分成两个分片,并且输入批次被分成多个部分并以流水线方式馈送到两个模型分片中。不同之处在于,本教程使用异步 RPC 调用来并行执行,而不是使用 CUDA 流来并行执行。因此,本教程中提出的解决方案也适用于跨机器边界。本教程的其余部分将以四个步骤呈现实现。

步骤 1:对 ResNet50 模型进行分区

这是准备步骤,实现了在两个模型分片中的ResNet50。下面的代码是从torchvision 中的 ResNet 实现借用的。ResNetBase模块包含了两个 ResNet 分片的共同构建块和属性。

import threading
import torch
import torch.nn as nn
from torchvision.models.resnet import Bottleneck
num_classes = 1000
def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class ResNetBase(nn.Module):
    def __init__(self, block, inplanes, num_classes=1000,
                groups=1, width_per_group=64, norm_layer=None):
        super(ResNetBase, self).__init__()
        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group
    def _make_layer(self, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * self._block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * self._block.expansion, stride),
                norm_layer(planes * self._block.expansion),
            )
        layers = []
        layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
                                self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * self._block.expansion
        for _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                    base_width=self.base_width, dilation=self.dilation,
                                    norm_layer=norm_layer))
        return nn.Sequential(*layers)
    def parameter_rrefs(self):
        return [RRef(p) for p in self.parameters()] 

现在,我们准备定义两个模型分片。对于构造函数,我们简单地将所有 ResNet50 层分成两部分,并将每部分移动到提供的设备上。这两个分片的forward函数接受输入数据的RRef,在本地获取数据,然后将其移动到预期的设备上。在将所有层应用于输入后,将输出移动到 CPU 并返回。这是因为 RPC API 要求张量驻留在 CPU 上,以避免在调用方和被调用方的设备数量不匹配时出现无效设备错误。

class ResNetShard1(ResNetBase):
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard1, self).__init__(
            Bottleneck, 64, num_classes=num_classes, *args, **kwargs)
        self.device = device
        self.seq = nn.Sequential(
            nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
            self._norm_layer(self.inplanes),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            self._make_layer(64, 3),
            self._make_layer(128, 4, stride=2)
        ).to(self.device)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out =  self.seq(x)
        return out.cpu()
class ResNetShard2(ResNetBase):
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)
        self.device = device
        self.seq = nn.Sequential(
            self._make_layer(256, 6, stride=2),
            self._make_layer(512, 3, stride=2),
            nn.AdaptiveAvgPool2d((1, 1)),
        ).to(self.device)
        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)
    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out = self.fc(torch.flatten(self.seq(x), 1))
        return out.cpu()

步骤 2:将 ResNet50 模型分片拼接成一个模块

然后,我们创建一个DistResNet50模块来组装两个分片并实现管道并行逻辑。在构造函数中,我们使用两个rpc.remote调用分别将两个分片放在两个不同的 RPC 工作进程上,并保留两个模型部分的RRef,以便它们可以在前向传递中引用。forward函数将输入批次分成多个微批次,并以管道方式将这些微批次馈送到两个模型部分。它首先使用rpc.remote调用将第一个分片应用于微批次,然后将返回的中间输出RRef转发到第二个模型分片。之后,它收集所有微输出的Future,并在循环后等待所有微输出。请注意,remote()rpc_async()都会立即返回并异步运行。因此,整个循环是非阻塞的,并且将同时启动多个 RPC。通过中间输出y_rref保留了两个模型部分上一个微批次的执行顺序。跨微批次的执行顺序并不重要。最后,forward 函数将所有微批次的输出连接成一个单一的输出张量并返回。parameter_rrefs函数是一个辅助函数,用于简化分布式优化器的构建,稍后将使用它。

class DistResNet50(nn.Module):
    def __init__(self, num_split, workers, *args, **kwargs):
        super(DistResNet50, self).__init__()
        self.num_split = num_split
        # Put the first part of the ResNet50 on workers[0]
        self.p1_rref = rpc.remote(
            workers[0],
            ResNetShard1,
            args = ("cuda:0",) + args,
            kwargs = kwargs
        )
        # Put the second part of the ResNet50 on workers[1]
        self.p2_rref = rpc.remote(
            workers[1],
            ResNetShard2,
            args = ("cuda:1",) + args,
            kwargs = kwargs
        )
    def forward(self, xs):
        out_futures = []
        for x in iter(xs.split(self.num_split, dim=0)):
            x_rref = RRef(x)
            y_rref = self.p1_rref.remote().forward(x_rref)
            z_fut = self.p2_rref.rpc_async().forward(y_rref)
            out_futures.append(z_fut)
        return torch.cat(torch.futures.wait_all(out_futures))
    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params 

步骤 3:定义训练循环

在定义模型之后,让我们实现训练循环。我们使用一个专用的“主”工作进程来准备随机输入和标签,并控制分布式反向传递和分布式优化器步骤。首先创建一个DistResNet50模块的实例。它指定每个批次的微批次数量,并提供两个 RPC 工作进程的名称(即“worker1”和“worker2”)。然后定义损失函数,并使用parameter_rrefs()助手创建一个DistributedOptimizer来获取参数RRefs的列表。然后,主要训练循环与常规本地训练非常相似,只是它使用dist_autograd来启动反向传递,并为反向传递和优化器step()提供context_id

import torch.distributed.autograd as dist_autograd
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
num_batches = 3
batch_size = 120
image_w = 128
image_h = 128
def run_master(num_split):
    # put the two model parts on worker1 and worker2 respectively
    model = DistResNet50(num_split, ["worker1", "worker2"])
    loss_fn = nn.MSELoss()
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )
    one_hot_indices = torch.LongTensor(batch_size) \
                        .random_(0, num_classes) \
                        .view(batch_size, 1)
    for i in range(num_batches):
        print(f"Processing batch {i}")
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                    .scatter_(1, one_hot_indices, 1)
        with dist_autograd.context() as context_id:
            outputs = model(inputs)
            dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
            opt.step(context_id) 

步骤 4:启动 RPC 进程

最后,下面的代码展示了所有进程的目标函数。主要逻辑在run_master中定义。工作进程 passively 等待来自主进程的命令,因此只需运行init_rpcshutdown,其中shutdown默认情况下将阻塞,直到所有 RPC 参与者完成。

import os
import time
import torch.multiprocessing as mp
def run_worker(rank, world_size, num_split):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)
    if rank == 0:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master(num_split)
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass
    # block until all rpcs finish
    rpc.shutdown()
if __name__=="__main__":
    world_size = 3
    for num_split in [1, 2, 4, 8]:
        tik = time.time()
        mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
        tok = time.time()
        print(f"number of splits = {num_split}, execution time = {tok  -  tik}") 

使用异步执行实现批量 RPC 处理

原文:pytorch.org/tutorials/intermediate/rpc_async_execution.html

译者:飞龙

协议:CC BY-NC-SA 4.0

作者Shen Li

注意

github中查看和编辑本教程。

先决条件:

  • PyTorch 分布式概述
  • 使用分布式 RPC 框架入门
  • 使用分布式 RPC 框架实现参数服务器
  • RPC 异步执行装饰器

本教程演示了如何使用@rpc.functions.async_execution装饰器构建批处理 RPC 应用程序,通过减少阻塞的 RPC 线程数量和在被调用方上合并 CUDA 操作来加速训练。这与TorchServe 的批量推理的思想相同。

注意

本教程需要 PyTorch v1.6.0 或更高版本。

基础知识

以前的教程展示了使用torch.distributed.rpc构建分布式训练应用程序的步骤,但没有详细说明在处理 RPC 请求时被调用方发生了什么。在 PyTorch v1.5 中,每个 RPC 请求将阻塞被调用方的一个线程来执行该请求中的函数,直到该函数返回。这对许多用例有效,但有一个注意事项。如果用户函数在 IO 上阻塞,例如,嵌套的 RPC 调用,或者信号,例如,等待不同的 RPC 请求解除阻塞,那么被调用方上的 RPC 线程将不得不空闲等待,直到 IO 完成或信号事件发生。结果,RPC 被调用方可能会使用比必要更多的线程。这个问题的原因是 RPC 将用户函数视为黑匣子,并且对函数中发生的事情知之甚少。为了允许用户函数产生并释放 RPC 线程,需要向 RPC 系统提供更多提示。

自 v1.6.0 以来,PyTorch 通过引入两个新概念来解决这个问题:

有了这两个工具,应用代码可以将用户函数分解为多个较小的函数,将它们链接为Future对象上的回调,并返回包含最终结果的Future。在被调用方,当获取Future对象时,也会安装后续的 RPC 响应准备和通信作为回调,当最终结果准备好时将被触发。这样,被调用方不再需要阻塞一个线程并等待最终返回值准备好。请参考@rpc.functions.async_execution的 API 文档,了解简单示例。

除了减少被调用方上的空闲线程数量外,这些工具还有助于使批处理 RPC 处理更加简单和快速。本教程的以下两个部分演示了如何使用 @rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批处理强化学习应用程序。

批量更新参数服务器

考虑一个具有一个参数服务器(PS)和多个训练器的同步参数服务器训练应用程序。在此应用程序中,PS 持有参数并等待所有训练器报告梯度。在每次迭代中,它等待直到从所有训练器接收到梯度,然后一次性更新所有参数。下面的代码显示了 PS 类的实现。update_and_fetch_model 方法使用 @rpc.functions.async_execution 装饰,并将被训练器调用。每次调用都返回一个将填充更新模型的 Future 对象。大多数训练器发起的调用只是将梯度累积到 .grad 字段中,立即返回,并在 PS 上释放 RPC 线程。最后到达的训练器将触发优化器步骤并消耗所有先前报告的梯度。然后它使用更新后的模型设置 future_model,进而通过 Future 对象通知其他训练器的所有先前请求,并将更新后的模型发送给所有训练器。

import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim
num_classes, batch_update_size = 30, 5
class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)
    def get_model(self):
        return self.model
    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g
            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model
            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()
        return fut 

对于训练器,它们都使用来自 PS 的相同参数集进行初始化。在每次迭代中,每个训练器首先运行前向和反向传递以在本地生成梯度。然后,每个训练器使用 RPC 报告其梯度给 PS,并通过相同 RPC 请求的返回值获取更新后的参数。在训练器的实现中,目标函数是否标记为 @rpc.functions.async_execution 都没有区别。训练器只需使用 rpc_sync 调用 update_and_fetch_model,它将在训练器上阻塞,直到更新的模型返回。

batch_size, image_w, image_h  = 20, 64, 64
class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)
    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()
    def train(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda() 

本教程跳过了启动多个进程的代码,请参考 examples 仓库获取完整实现。请注意,可以在不使用 @rpc.functions.async_execution 装饰器的情况下实现批处理。但是,这将要求在 PS 上阻塞更多的 RPC 线程,或者使用另一轮 RPC 来获取更新的模型,后者将增加代码复杂性和通信开销。

本节使用一个简单的参数服务器训练示例来展示如何使用 @rpc.functions.async_execution 装饰器实现批处理 RPC 应用程序。在下一节中,我们将使用批处理重新实现之前的 使用分布式 RPC 框架入门 教程中的强化学习示例,并展示其对训练速度的影响。

PyTorch 2.2 中文官方教程(十九)(2)https://developer.aliyun.com/article/1482622

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
1月前
|
存储 物联网 PyTorch
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
**Torchtune**是由PyTorch团队开发的一个专门用于LLM微调的库。它旨在简化LLM的微调流程,提供了一系列高级API和预置的最佳实践
172 59
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
|
1月前
|
并行计算 监控 搜索推荐
使用 PyTorch-BigGraph 构建和部署大规模图嵌入的完整教程
当处理大规模图数据时,复杂性难以避免。PyTorch-BigGraph (PBG) 是一款专为此设计的工具,能够高效处理数十亿节点和边的图数据。PBG通过多GPU或节点无缝扩展,利用高效的分区技术,生成准确的嵌入表示,适用于社交网络、推荐系统和知识图谱等领域。本文详细介绍PBG的设置、训练和优化方法,涵盖环境配置、数据准备、模型训练、性能优化和实际应用案例,帮助读者高效处理大规模图数据。
54 5
|
4月前
|
并行计算 Ubuntu PyTorch
Ubuntu下CUDA、Conda、Pytorch联合教程
本文是一份Ubuntu系统下安装和配置CUDA、Conda和Pytorch的教程,涵盖了查看显卡驱动、下载安装CUDA、添加环境变量、卸载CUDA、Anaconda的下载安装、环境管理以及Pytorch的安装和验证等步骤。
817 1
Ubuntu下CUDA、Conda、Pytorch联合教程
|
7月前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十八)(4)
PyTorch 2.2 中文官方教程(十八)
120 1
|
7月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.2 中文官方教程(二十)(4)
PyTorch 2.2 中文官方教程(二十)
138 0
PyTorch 2.2 中文官方教程(二十)(4)
|
7月前
|
Android开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(2)
PyTorch 2.2 中文官方教程(二十)
120 0
PyTorch 2.2 中文官方教程(二十)(2)
|
7月前
|
iOS开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(1)
PyTorch 2.2 中文官方教程(二十)
118 0
PyTorch 2.2 中文官方教程(二十)(1)
|
7月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.2 中文官方教程(十九)(3)
PyTorch 2.2 中文官方教程(十九)
72 0
PyTorch 2.2 中文官方教程(十九)(3)
|
7月前
|
异构计算 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十九)(2)
PyTorch 2.2 中文官方教程(十九)
103 0
PyTorch 2.2 中文官方教程(十九)(2)
|
7月前
|
PyTorch 算法框架/工具 并行计算
PyTorch 2.2 中文官方教程(二十)(3)
PyTorch 2.2 中文官方教程(二十)
183 0