使用 RPC 进行分布式管道并行
作者:Shen Li
本教程使用 Resnet50 模型演示了如何使用torch.distributed.rpc API 实现分布式管道并行。这可以看作是单机模型并行最佳实践中讨论的多 GPU 管道并行的分布式对应。
本教程要求使用 PyTorch v1.6.0 或更高版本。
之前的教程开始使用分布式 RPC 框架展示了如何使用torch.distributed.rpc为 RNN 模型实现分布式模型并行。该教程使用一个 GPU 来托管EmbeddingTable
,提供的代码可以正常工作。但是,如果一个模型存在于多个 GPU 上,就需要一些额外的步骤来增加所有 GPU 的摊销利用率。管道并行是一种可以在这种情况下有所帮助的范式之一。
模型被分成两个分片,并且输入批次被分成多个部分并以流水线方式馈送到两个模型分片中。不同之处在于,本教程使用异步 RPC 调用来并行执行,而不是使用 CUDA 流来并行执行。因此,本教程中提出的解决方案也适用于跨机器边界。本教程的其余部分将以四个步骤呈现实现。
步骤 1:对 ResNet50 模型进行分区
。下面的代码是从torchvision 中的 ResNet 实现借用的。ResNetBase
模块包含了两个 ResNet 分片的共同构建块和属性。
import threading import torch import torch.nn as nn from torchvision.models.resnet import Bottleneck num_classes = 1000 def conv1x1(in_planes, out_planes, stride=1): return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False) class ResNetBase(nn.Module): def __init__(self, block, inplanes, num_classes=1000, groups=1, width_per_group=64, norm_layer=None): super(ResNetBase, self).__init__() self._lock = threading.Lock() self._block = block self._norm_layer = nn.BatchNorm2d self.inplanes = inplanes self.dilation = 1 self.groups = groups self.base_width = width_per_group def _make_layer(self, planes, blocks, stride=1): norm_layer = self._norm_layer downsample = None previous_dilation = self.dilation if stride != 1 or self.inplanes != planes * self._block.expansion: downsample = nn.Sequential( conv1x1(self.inplanes, planes * self._block.expansion, stride), norm_layer(planes * self._block.expansion), ) layers = [] layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer)) self.inplanes = planes * self._block.expansion for _ in range(1, blocks): layers.append(self._block(self.inplanes, planes, groups=self.groups, base_width=self.base_width, dilation=self.dilation, norm_layer=norm_layer)) return nn.Sequential(*layers) def parameter_rrefs(self): return [RRef(p) for p in self.parameters()]
现在,我们准备定义两个模型分片。对于构造函数,我们简单地将所有 ResNet50 层分成两部分,并将每部分移动到提供的设备上。这两个分片的forward
,在本地获取数据,然后将其移动到预期的设备上。在将所有层应用于输入后,将输出移动到 CPU 并返回。这是因为 RPC API 要求张量驻留在 CPU 上,以避免在调用方和被调用方的设备数量不匹配时出现无效设备错误。
class ResNetShard1(ResNetBase): def __init__(self, device, *args, **kwargs): super(ResNetShard1, self).__init__( Bottleneck, 64, num_classes=num_classes, *args, **kwargs) self.device = device self.seq = nn.Sequential( nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False), self._norm_layer(self.inplanes), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2, padding=1), self._make_layer(64, 3), self._make_layer(128, 4, stride=2) ).to(self.device) for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') elif isinstance(m, nn.BatchNorm2d): nn.init.constant_(m.weight, 1) nn.init.constant_(m.bias, 0) def forward(self, x_rref): x = x_rref.to_here().to(self.device) with self._lock: out = self.seq(x) return out.cpu() class ResNetShard2(ResNetBase): def __init__(self, device, *args, **kwargs): super(ResNetShard2, self).__init__( Bottleneck, 512, num_classes=num_classes, *args, **kwargs) self.device = device self.seq = nn.Sequential( self._make_layer(256, 6, stride=2), self._make_layer(512, 3, stride=2), nn.AdaptiveAvgPool2d((1, 1)), ).to(self.device) self.fc = nn.Linear(512 * self._block.expansion, num_classes).to(self.device) def forward(self, x_rref): x = x_rref.to_here().to(self.device) with self._lock: out = self.fc(torch.flatten(self.seq(x), 1)) return out.cpu()
步骤 2:将 ResNet50 模型分片拼接成一个模块
调用分别将两个分片放在两个不同的 RPC 工作进程上,并保留两个模型部分的RRef
都会立即返回并异步运行。因此,整个循环是非阻塞的,并且将同时启动多个 RPC。通过中间输出y_rref
保留了两个模型部分上一个微批次的执行顺序。跨微批次的执行顺序并不重要。最后,forward 函数将所有微批次的输出连接成一个单一的输出张量并返回。parameter_rrefs
class DistResNet50(nn.Module): def __init__(self, num_split, workers, *args, **kwargs): super(DistResNet50, self).__init__() self.num_split = num_split # Put the first part of the ResNet50 on workers[0] self.p1_rref = rpc.remote( workers[0], ResNetShard1, args = ("cuda:0",) + args, kwargs = kwargs ) # Put the second part of the ResNet50 on workers[1] self.p2_rref = rpc.remote( workers[1], ResNetShard2, args = ("cuda:1",) + args, kwargs = kwargs ) def forward(self, xs): out_futures = [] for x in iter(xs.split(self.num_split, dim=0)): x_rref = RRef(x) y_rref = self.p1_rref.remote().forward(x_rref) z_fut = self.p2_rref.rpc_async().forward(y_rref) out_futures.append(z_fut) return torch.cat(torch.futures.wait_all(out_futures)) def parameter_rrefs(self): remote_params = [] remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here()) remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here()) return remote_params
步骤 3:定义训练循环
模块的实例。它指定每个批次的微批次数量,并提供两个 RPC 工作进程的名称(即“worker1”和“worker2”)。然后定义损失函数,并使用parameter_rrefs()
import torch.distributed.autograd as dist_autograd import torch.optim as optim from torch.distributed.optim import DistributedOptimizer num_batches = 3 batch_size = 120 image_w = 128 image_h = 128 def run_master(num_split): # put the two model parts on worker1 and worker2 respectively model = DistResNet50(num_split, ["worker1", "worker2"]) loss_fn = nn.MSELoss() opt = DistributedOptimizer( optim.SGD, model.parameter_rrefs(), lr=0.05, ) one_hot_indices = torch.LongTensor(batch_size) \ .random_(0, num_classes) \ .view(batch_size, 1) for i in range(num_batches): print(f"Processing batch {i}") # generate random inputs and labels inputs = torch.randn(batch_size, 3, image_w, image_h) labels = torch.zeros(batch_size, num_classes) \ .scatter_(1, one_hot_indices, 1) with dist_autograd.context() as context_id: outputs = model(inputs) dist_autograd.backward(context_id, [loss_fn(outputs, labels)]) opt.step(context_id)
步骤 4:启动 RPC 进程
中定义。工作进程 passively 等待来自主进程的命令,因此只需运行init_rpc
默认情况下将阻塞,直到所有 RPC 参与者完成。
import os import time import torch.multiprocessing as mp def run_worker(rank, world_size, num_split): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128) if rank == 0: rpc.init_rpc( "master", rank=rank, world_size=world_size, rpc_backend_options=options ) run_master(num_split) else: rpc.init_rpc( f"worker{rank}", rank=rank, world_size=world_size, rpc_backend_options=options ) pass # block until all rpcs finish rpc.shutdown() if __name__=="__main__": world_size = 3 for num_split in [1, 2, 4, 8]: tik = time.time() mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True) tok = time.time() print(f"number of splits = {num_split}, execution time = {tok - tik}")
使用异步执行实现批量 RPC 处理
作者:Shen Li
本教程演示了如何使用@rpc.functions.async_execution装饰器构建批处理 RPC 应用程序,通过减少阻塞的 RPC 线程数量和在被调用方上合并 CUDA 操作来加速训练。这与TorchServe 的批量推理的思想相同。
本教程需要 PyTorch v1.6.0 或更高版本。
以前的教程展示了使用torch.distributed.rpc构建分布式训练应用程序的步骤,但没有详细说明在处理 RPC 请求时被调用方发生了什么。在 PyTorch v1.5 中,每个 RPC 请求将阻塞被调用方的一个线程来执行该请求中的函数,直到该函数返回。这对许多用例有效,但有一个注意事项。如果用户函数在 IO 上阻塞,例如,嵌套的 RPC 调用,或者信号,例如,等待不同的 RPC 请求解除阻塞,那么被调用方上的 RPC 线程将不得不空闲等待,直到 IO 完成或信号事件发生。结果,RPC 被调用方可能会使用比必要更多的线程。这个问题的原因是 RPC 将用户函数视为黑匣子,并且对函数中发生的事情知之甚少。为了允许用户函数产生并释放 RPC 线程,需要向 RPC 系统提供更多提示。
自 v1.6.0 以来,PyTorch 通过引入两个新概念来解决这个问题:
- torch.futures.Future 类型,封装了异步执行,还支持安装回调函数。
- @rpc.functions.async_execution 装饰器允许应用告诉被调用方目标函数将返回一个 future,并且在执行过程中可以暂停和多次产生。
对象时,也会安装后续的 RPC 响应准备和通信作为回调,当最终结果准备好时将被触发。这样,被调用方不再需要阻塞一个线程并等待最终返回值准备好。请参考@rpc.functions.async_execution的 API 文档,了解简单示例。
除了减少被调用方上的空闲线程数量外,这些工具还有助于使批处理 RPC 处理更加简单和快速。本教程的以下两个部分演示了如何使用 @rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批处理强化学习应用程序。
考虑一个具有一个参数服务器(PS)和多个训练器的同步参数服务器训练应用程序。在此应用程序中,PS 持有参数并等待所有训练器报告梯度。在每次迭代中,它等待直到从所有训练器接收到梯度,然后一次性更新所有参数。下面的代码显示了 PS 类的实现。update_and_fetch_model
方法使用 @rpc.functions.async_execution
装饰,并将被训练器调用。每次调用都返回一个将填充更新模型的 Future
对象。大多数训练器发起的调用只是将梯度累积到 .grad
字段中,立即返回,并在 PS 上释放 RPC 线程。最后到达的训练器将触发优化器步骤并消耗所有先前报告的梯度。然后它使用更新后的模型设置 future_model
,进而通过 Future
import threading import torchvision import torch import torch.distributed.rpc as rpc from torch import optim num_classes, batch_update_size = 30, 5 class BatchUpdateParameterServer(object): def __init__(self, batch_update_size=batch_update_size): self.model = torchvision.models.resnet50(num_classes=num_classes) self.lock = threading.Lock() self.future_model = torch.futures.Future() self.batch_update_size = batch_update_size self.curr_update_size = 0 self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9) for p in self.model.parameters(): p.grad = torch.zeros_like(p) def get_model(self): return self.model @staticmethod @rpc.functions.async_execution def update_and_fetch_model(ps_rref, grads): # Using the RRef to retrieve the local PS instance self = ps_rref.local_value() with self.lock: self.curr_update_size += 1 # accumulate gradients into .grad field for p, g in zip(self.model.parameters(), grads): p.grad += g # Save the current future_model and return it to make sure the # returned Future object holds the correct model even if another # thread modifies future_model before this thread returns. fut = self.future_model if self.curr_update_size >= self.batch_update_size: # update the model for p in self.model.parameters(): p.grad /= self.batch_update_size self.curr_update_size = 0 self.optimizer.step() self.optimizer.zero_grad() # by settiing the result on the Future object, all previous # requests expecting this updated model will be notified and # the their responses will be sent accordingly. fut.set_result(self.model) self.future_model = torch.futures.Future() return fut
对于训练器,它们都使用来自 PS 的相同参数集进行初始化。在每次迭代中,每个训练器首先运行前向和反向传递以在本地生成梯度。然后,每个训练器使用 RPC 报告其梯度给 PS,并通过相同 RPC 请求的返回值获取更新后的参数。在训练器的实现中,目标函数是否标记为 @rpc.functions.async_execution
都没有区别。训练器只需使用 rpc_sync
调用 update_and_fetch_model
batch_size, image_w, image_h = 20, 64, 64 class Trainer(object): def __init__(self, ps_rref): self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss() self.one_hot_indices = torch.LongTensor(batch_size) \ .random_(0, num_classes) \ .view(batch_size, 1) def get_next_batch(self): for _ in range(6): inputs = torch.randn(batch_size, 3, image_w, image_h) labels = torch.zeros(batch_size, num_classes) \ .scatter_(1, self.one_hot_indices, 1) yield inputs.cuda(), labels.cuda() def train(self): name = rpc.get_worker_info().name # get initial model parameters m = self.ps_rref.rpc_sync().get_model().cuda() # start training for inputs, labels in self.get_next_batch(): self.loss_fn(m(inputs), labels).backward() m = rpc.rpc_sync( self.ps_rref.owner(), BatchUpdateParameterServer.update_and_fetch_model, args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]), ).cuda()
本教程跳过了启动多个进程的代码,请参考 examples 仓库获取完整实现。请注意,可以在不使用 @rpc.functions.async_execution 装饰器的情况下实现批处理。但是,这将要求在 PS 上阻塞更多的 RPC 线程,或者使用另一轮 RPC 来获取更新的模型,后者将增加代码复杂性和通信开销。
本节使用一个简单的参数服务器训练示例来展示如何使用 @rpc.functions.async_execution 装饰器实现批处理 RPC 应用程序。在下一节中,我们将使用批处理重新实现之前的 使用分布式 RPC 框架入门 教程中的强化学习示例,并展示其对训练速度的影响。
