PyTorch 2.2 中文官方教程(十八)(3)https://developer.aliyun.com/article/1482608
使用分布式 RPC 框架实现参数服务器
原文:
pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html
译者:飞龙
作者:Rohan Varma
注意
在github中查看并编辑本教程。
先决条件:
- PyTorch 分布式概述
- RPC API 文档
本教程演示了使用 PyTorch 的分布式 RPC 框架实现参数服务器的简单示例。参数服务器框架是一种范式,其中一组服务器存储参数,例如大型嵌入表,几个训练器查询参数服务器以检索最新的参数。这些训练器可以在本地运行训练循环,并偶尔与参数服务器同步以获取最新的参数。要了解更多关于参数服务器方法的信息,请查看这篇论文。
使用分布式 RPC 框架,我们将构建一个示例,其中多个训练器使用 RPC 与同一参数服务器通信,并使用RRef来访问远程参数服务器实例上的状态。每个训练器将通过在多个节点之间的自动求导图上进行分布式反向传递的拼接来启动其专用的反向传递。
注意:本教程涵盖了分布式 RPC 框架的使用,该框架对于将模型分割到多台机器上或实现参数服务器训练策略非常有用,其中网络训练器获取托管在不同机器上的参数。如果您想要在多个 GPU 上复制模型,请参阅分布式数据并行教程。还有另一个RPC 教程,涵盖了强化学习和 RNN 用例。
让我们从熟悉的开始:导入所需的模块并定义一个简单的 ConvNet,该网络将在 MNIST 数据集上进行训练。下面的网络主要采用自pytorch/examples repo中定义的网络。
import argparse import os import time from threading import Lock import torch import torch.distributed.autograd as dist_autograd import torch.distributed.rpc as rpc import torch.multiprocessing as mp import torch.nn as nn import torch.nn.functional as F from torch import optim from torch.distributed.optim import DistributedOptimizer from torchvision import datasets, transforms # --------- MNIST Network to train, from pytorch/examples ----- class Net(nn.Module): def __init__(self, num_gpus=0): super(Net, self).__init__() print(f"Using {num_gpus} GPUs to train") self.num_gpus = num_gpus device = torch.device( "cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu") print(f"Putting first 2 convs on {str(device)}") # Put conv layers on the first cuda device, or CPU if no cuda device self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device) self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device) # Put rest of the network on the 2nd cuda device, if there is one if "cuda" in str(device) and num_gpus > 1: device = torch.device("cuda:1") print(f"Putting rest of layers on {str(device)}") self.dropout1 = nn.Dropout2d(0.25).to(device) self.dropout2 = nn.Dropout2d(0.5).to(device) self.fc1 = nn.Linear(9216, 128).to(device) self.fc2 = nn.Linear(128, 10).to(device) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) # Move tensor to next device if necessary next_device = next(self.fc1.parameters()).device x = x.to(next_device) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output
接下来,让我们定义一些有用的辅助函数,这些函数将对我们脚本的其余部分很有用。以下使用rpc_sync和RRef来定义一个函数,该函数在远程节点上调用给定对象的方法。在下面,我们对远程对象的句柄由rref
参数给出,并在拥有节点上运行它:rref.owner()
。在调用节点上,我们通过使用rpc_sync
同步运行此命令,这意味着我们将阻塞直到收到响应。
# --------- Helper Methods -------------------- # On the local node, call a method with first arg as the value held by the # RRef. Other args are passed in as arguments to the function called. # Useful for calling instance methods. method could be any matching function, including # class methods. def call_method(method, rref, *args, **kwargs): return method(rref.local_value(), *args, **kwargs) # Given an RRef, return the result of calling the passed in method on the value # held by the RRef. This call is done on the remote node that owns # the RRef and passes along the given argument. # Example: If the value held by the RRef is of type Foo, then # remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling # <foo_instance>.bar(arg1, arg2) on the remote node and getting the result # back. def remote_method(method, rref, *args, **kwargs): args = [method, rref] + list(args) return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)
现在,我们准备定义我们的参数服务器。我们将子类化nn.Module
并保存一个句柄到我们上面定义的网络。我们还将保存一个输入设备,这将是在调用模型之前将输入传输到的设备。
# --------- Parameter Server -------------------- class ParameterServer(nn.Module): def __init__(self, num_gpus=0): super().__init__() model = Net(num_gpus=num_gpus) self.model = model self.input_device = torch.device( "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")
接下来,我们将定义我们的前向传递。请注意,无论模型输出的设备如何,我们都将输出移动到 CPU,因为分布式 RPC 框架目前仅支持通过 RPC 发送 CPU 张量。由于调用方/被调用方可能存在不同设备(CPU/GPU),我们故意禁用了通过 RPC 发送 CUDA 张量,但可能会在未来版本中支持。
class ParameterServer(nn.Module): ... def forward(self, inp): inp = inp.to(self.input_device) out = self.model(inp) # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors. # Tensors must be moved in and out of GPU memory due to this. out = out.to("cpu") return out
接下来,我们将定义一些对训练和验证有用的杂项函数。首先,get_dist_gradients
将接收一个分布式自动求导上下文 ID,并调用dist_autograd.get_gradients
API 来检索分布式自动求导计算的梯度。更多信息可以在分布式自动求导文档中找到。请注意,我们还会遍历结果字典,并将每个张量转换为 CPU 张量,因为目前框架只支持通过 RPC 发送张量。接下来,get_param_rrefs
将遍历我们的模型参数,并将它们包装为一个(本地)RRef。这个方法将被训练节点通过 RPC 调用,并返回要优化的参数列表。这是Distributed Optimizer的输入要求,它要求所有必须优化的参数作为RRef
列表。
# Use dist autograd to retrieve gradients accumulated for this model. # Primarily used for verification. def get_dist_gradients(self, cid): grads = dist_autograd.get_gradients(cid) # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors. # Tensors must be moved in and out of GPU memory due to this. cpu_grads = {} for k, v in grads.items(): k_cpu, v_cpu = k.to("cpu"), v.to("cpu") cpu_grads[k_cpu] = v_cpu return cpu_grads # Wrap local parameters in a RRef. Needed for building the # DistributedOptimizer which optimizes paramters remotely. def get_param_rrefs(self): param_rrefs = [rpc.RRef(param) for param in self.model.parameters()] return param_rrefs
最后,我们将创建方法来初始化我们的参数服务器。请注意,在所有进程中只会有一个参数服务器实例,并且所有训练器将与同一个参数服务器通信并更新相同的存储模型。如run_parameter_server
中所示,服务器本身不会采取任何独立的行动;它会等待来自训练器(尚未定义)的请求,并通过运行请求的函数来响应它们。
# The global parameter server instance. param_server = None # A lock to ensure we only have one parameter server. global_lock = Lock() def get_parameter_server(num_gpus=0): """ Returns a singleton parameter server to all trainer processes """ global param_server # Ensure that we get only one handle to the ParameterServer. with global_lock: if not param_server: # construct it once param_server = ParameterServer(num_gpus=num_gpus) return param_server def run_parameter_server(rank, world_size): # The parameter server just acts as a host for the model and responds to # requests from trainers. # rpc.shutdown() will wait for all workers to complete by default, which # in this case means that the parameter server will wait for all trainers # to complete, and then exit. print("PS master initializing RPC") rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size) print("RPC initialized! Running parameter server...") rpc.shutdown() print("RPC shutdown on parameter server.")
请注意,上面的rpc.shutdown()
不会立即关闭参数服务器。相反,它将等待所有工作节点(在这种情况下是训练器)也调用rpc.shutdown()
。这样我们就可以保证在所有训练器(尚未定义)完成训练过程之前,参数服务器不会下线。
接下来,我们将定义我们的TrainerNet
类。这也将是nn.Module
的子类,我们的__init__
方法将使用rpc.remote
API 来获取一个 RRef,或者远程引用,到我们的参数服务器。请注意,这里我们不会将参数服务器复制到我们的本地进程,相反,我们可以将self.param_server_rref
看作是指向在单独进程中运行的参数服务器的分布式共享指针。
# --------- Trainers -------------------- # nn.Module corresponding to the network trained by this trainer. The # forward() method simply invokes the network on the given parameter # server. class TrainerNet(nn.Module): def __init__(self, num_gpus=0): super().__init__() self.num_gpus = num_gpus self.param_server_rref = rpc.remote( "parameter_server", get_parameter_server, args=(num_gpus,))
接下来,我们将定义一个名为get_global_param_rrefs
的方法。为了激发对这个方法的需求,值得阅读DistributedOptimizer上的文档,特别是 API 签名。优化器必须传递一个要优化的远程参数的RRef
列表,所以这里我们获取必要的RRef
。由于给定的TrainerNet
与唯一的远程工作节点ParameterServer
进行交互,我们只需在ParameterServer
上调用remote_method
。我们使用在ParameterServer
类中定义的get_param_rrefs
方法。这个方法将返回一个需要被优化的参数的RRef
列表。请注意,在这种情况下,我们的TrainerNet
不定义自己的参数;如果定义了,我们还需要将每个参数包装成一个RRef
,并将其包含在输入到DistributedOptimizer
中。
class TrainerNet(nn.Module): ... def get_global_param_rrefs(self): remote_params = remote_method( ParameterServer.get_param_rrefs, self.param_server_rref) return remote_params
现在,我们准备定义我们的forward
方法,它将调用(同步)RPC 来运行在ParameterServer
上定义的网络的前向传播。请注意,我们传入self.param_server_rref
,这是对我们的ParameterServer
的远程句柄,到我们的 RPC 调用中。这个调用将发送一个 RPC 到我们的ParameterServer
正在运行的节点上,调用forward
传播,并返回对应于模型输出的Tensor
。
class TrainerNet(nn.Module): ... def forward(self, x): model_output = remote_method( ParameterServer.forward, self.param_server_rref, x) return model_output
我们的训练器已经完全定义好了,现在是时候编写我们的神经网络训练循环,该循环将创建我们的网络和优化器,运行一些输入通过网络并计算损失。训练循环看起来很像本地训练程序的循环,但由于我们的网络分布在多台机器上,所以有一些修改。
在下面,我们初始化我们的TrainerNet
并构建一个DistributedOptimizer
。请注意,如上所述,我们必须传入所有全局(参与分布式训练的所有节点)参数,我们希望进行优化。此外,我们传入要使用的本地优化器,本例中为 SGD。请注意,我们可以像创建本地优化器一样配置底层优化算法 - 所有optimizer.SGD
的参数都将被正确转发。例如,我们传入一个自定义学习率,该学习率将用作所有本地优化器的学习率。
def run_training_loop(rank, num_gpus, train_loader, test_loader): # Runs the typical nueral network forward + backward + optimizer step, but # in a distributed fashion. net = TrainerNet(num_gpus=num_gpus) # Build DistributedOptimizer. param_rrefs = net.get_global_param_rrefs() opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
接下来,我们定义我们的主要训练循环。我们循环遍历 PyTorch 的DataLoader提供的可迭代对象。在编写典型的前向/后向/优化器循环之前,我们首先将逻辑包装在Distributed Autograd context中。请注意,这是为了记录模型前向传递中调用的 RPC,以便构建一个适当的图,其中包括在后向传递中参与的所有分布式工作节点。分布式自动求导上下文返回一个context_id
,用作累积和优化与特定迭代对应的梯度的标识符。
与调用典型的loss.backward()
不同,后者会在本地工作节点上启动后向传递,我们调用dist_autograd.backward()
并传入我们的context_id
以及loss
,这是我们希望从根开始进行后向传递的位置。此外,我们将这个context_id
传递给我们的优化器调用,这是必要的,以便能够查找由此特定后向传递计算的相应梯度跨所有节点。
def run_training_loop(rank, num_gpus, train_loader, test_loader): ... for i, (data, target) in enumerate(train_loader): with dist_autograd.context() as cid: model_output = net(data) target = target.to(model_output.device) loss = F.nll_loss(model_output, target) if i % 5 == 0: print(f"Rank {rank} training batch {i} loss {loss.item()}") dist_autograd.backward(cid, [loss]) # Ensure that dist autograd ran successfully and gradients were # returned. assert remote_method( ParameterServer.get_dist_gradients, net.param_server_rref, cid) != {} opt.step(cid) print("Training complete!") print("Getting accuracy....") get_accuracy(test_loader, net)
接下来,我们简单地计算模型在训练完成后的准确率,就像传统的本地模型一样。但是,请注意,我们在上面传递给此函数的net
是TrainerNet
的一个实例,因此前向传递以透明方式调用 RPC。
def get_accuracy(test_loader, model): model.eval() correct_sum = 0 # Use GPU to evaluate if possible device = torch.device("cuda:0" if model.num_gpus > 0 and torch.cuda.is_available() else "cpu") with torch.no_grad(): for i, (data, target) in enumerate(test_loader): out = model(data, -1) pred = out.argmax(dim=1, keepdim=True) pred, target = pred.to(device), target.to(device) correct = pred.eq(target.view_as(pred)).sum().item() correct_sum += correct print(f"Accuracy {correct_sum / len(test_loader.dataset)}")
接下来,类似于我们为ParameterServer
定义run_parameter_server
作为主循环的方式,负责初始化 RPC,让我们为训练器定义一个类似的循环。不同之处在于我们的训练器必须运行我们上面定义的训练循环:
# Main loop for trainers. def run_worker(rank, world_size, num_gpus, train_loader, test_loader): print(f"Worker rank {rank} initializing RPC") rpc.init_rpc( name=f"trainer_{rank}", rank=rank, world_size=world_size) print(f"Worker {rank} done initializing RPC") run_training_loop(rank, num_gpus, train_loader, test_loader) rpc.shutdown()
请注意,类似于run_parameter_server
,rpc.shutdown()
默认情况下会等待所有工作节点,包括训练器和参数服务器,调用rpc.shutdown()
后,该节点才会退出。这确保节点被优雅地终止,而不会在另一个节点期望其在线时离线。
我们现在已经完成了训练器和参数服务器特定的代码,剩下的就是添加代码来启动训练器和参数服务器。首先,我们必须接收适用于我们参数服务器和训练器的各种参数。world_size
对应于将参与训练的节点的总数,是所有训练器和参数服务器的总和。我们还必须为每个单独的进程传入一个唯一的rank
,从 0(我们将在其中运行单个参数服务器)到world_size - 1
。master_addr
和master_port
是可以用来识别 0 级进程运行位置的参数,并将被各个节点用于发现彼此。为了在本地测试此示例,只需将localhost
和相同的master_port
传递给所有生成的实例。请注意,出于演示目的,此示例仅支持 0-2 个 GPU,尽管该模式可以扩展以利用更多的 GPU。
if __name__ == '__main__': parser = argparse.ArgumentParser( description="Parameter-Server RPC based training") parser.add_argument( "--world_size", type=int, default=4, help="""Total number of participating processes. Should be the sum of master node and all training nodes.""") parser.add_argument( "rank", type=int, default=None, help="Global rank of this process. Pass in 0 for master.") parser.add_argument( "num_gpus", type=int, default=0, help="""Number of GPUs to use for training, Currently supports between 0 and 2 GPUs. Note that this argument will be passed to the parameter servers.""") parser.add_argument( "--master_addr", type=str, default="localhost", help="""Address of master, will default to localhost if not provided. Master must be able to accept network traffic on the address + port.""") parser.add_argument( "--master_port", type=str, default="29500", help="""Port that master is listening on, will default to 29500 if not provided. Master must be able to accept network traffic on the host and port.""") args = parser.parse_args() assert args.rank is not None, "must provide rank argument." assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})." os.environ['MASTER_ADDR'] = args.master_addr os.environ["MASTER_PORT"] = args.master_port
现在,我们将根据我们的命令行参数创建相应于参数服务器或训练器的进程。如果我们传入的等级为 0,则将创建一个ParameterServer
,否则将创建一个TrainerNet
。请注意,我们使用torch.multiprocessing
启动一个子进程,对应于我们要执行的函数,并在主线程中使用p.join()
等待该进程的完成。在初始化我们的训练器时,我们还使用 PyTorch 的dataloaders来指定 MNIST 数据集上的训练和测试数据加载器。
processes = [] world_size = args.world_size if args.rank == 0: p = mp.Process(target=run_parameter_server, args=(0, world_size)) p.start() processes.append(p) else: # Get data to train on train_loader = torch.utils.data.DataLoader( datasets.MNIST('../data', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=32, shuffle=True,) test_loader = torch.utils.data.DataLoader( datasets.MNIST( '../data', train=False, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=32, shuffle=True, ) # start training worker on this node p = mp.Process( target=run_worker, args=( args.rank, world_size, args.num_gpus, train_loader, test_loader)) p.start() processes.append(p) for p in processes: p.join()
要在本地运行示例,请在单独的终端窗口中为服务器和每个要生成的工作节点运行以下命令:python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK
。例如,对于世界大小为 2 的主节点,命令将是python rpc_parameter_server.py --world_size=2 --rank=0
。然后可以在单独的窗口中使用命令python rpc_parameter_server.py --world_size=2 --rank=1
启动训练器,这将开始使用一个服务器和一个训练器进行训练。请注意,本教程假定训练使用 0 到 2 个 GPU 进行,可以通过将--num_gpus=N
传递到训练脚本中进行配置。
您可以通过命令行参数--master_addr=ADDRESS
和--master_port=PORT
传入地址和端口,以指示主工作节点正在侦听的地址和端口,例如,用于测试训练器和主节点在不同机器上运行的功能。