PyTorch 2.2 中文官方教程(十八)(2)https://developer.aliyun.com/article/1482606
使用分布式 RPC 框架入门
原文:
pytorch.org/tutorials/intermediate/rpc_tutorial.html
译者:飞龙
作者:Shen Li
注意
在github中查看和编辑本教程。
先决条件:
- PyTorch 分布式概述
- RPC API 文档
本教程使用两个简单示例演示如何使用torch.distributed.rpc包构建分布式训练,该包最初作为 PyTorch v1.4 中的实验性功能引入。这两个示例的源代码可以在PyTorch 示例中找到。
之前的教程,使用分布式数据并行开始和使用 PyTorch 编写分布式应用程序,描述了分布式数据并行,支持一种特定的训练范式,其中模型在多个进程中复制,并且每个进程处理输入数据的一个部分。有时,您可能会遇到需要不同训练范式的情况。例如:
- 在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能非常小。在这种情况下,可能有用的是并行运行多个观察者并共享单个代理。在这种情况下,代理在本地处理训练,但应用程序仍需要库来在观察者和训练者之间发送和接收数据。
- 您的模型可能太大,无法适应单台机器上的 GPU,因此需要一个库来将模型分割到多台机器上。或者您可能正在实现一个参数服务器训练框架,其中模型参数和训练器位于不同的机器上。
torch.distributed.rpc包可以帮助处理上述情况。在情况 1 中,RPC和RRef允许从一个工作进程发送数据到另一个工作进程,同时轻松引用远程数据对象。在情况 2 中,分布式自动求导和分布式优化器使得执行反向传播和优化器步骤就像是本地训练一样。在接下来的两个部分中,我们将使用一个强化学习示例和一个语言模型示例演示torch.distributed.rpc的 API。请注意,本教程的目标不是构建最准确或高效的模型来解决给定问题,而是展示如何使用torch.distributed.rpc包构建分布式训练应用程序。
使用 RPC 和 RRef 进行分布式强化学习
本部分描述了使用 RPC 构建玩具分布式强化学习模型的步骤,以解决来自OpenAI Gym的 CartPole-v1 问题。策略代码大部分是从现有的单线程示例中借用的,如下所示。我们将跳过“策略”设计的细节,重点放在 RPC 的用法上。
import torch.nn as nn import torch.nn.functional as F class Policy(nn.Module): def __init__(self): super(Policy, self).__init__() self.affine1 = nn.Linear(4, 128) self.dropout = nn.Dropout(p=0.6) self.affine2 = nn.Linear(128, 2) def forward(self, x): x = self.affine1(x) x = self.dropout(x) x = F.relu(x) action_scores = self.affine2(x) return F.softmax(action_scores, dim=1)
我们准备展示观察者。在这个例子中,每个观察者都创建自己的环境,并等待代理的命令来运行一个剧集。在每个剧集中,一个观察者最多循环n_steps
次迭代,在每次迭代中,它使用 RPC 将其环境状态传递给代理,并获得一个动作。然后将该动作应用于其环境,并从环境中获得奖励和下一个状态。之后,观察者使用另一个 RPC 向代理报告奖励。再次请注意,这显然不是最有效的观察者实现。例如,一个简单的优化可以是将当前状态和上一个奖励打包在一个 RPC 中,以减少通信开销。然而,目标是演示 RPC API 而不是构建 CartPole 的最佳求解器。因此,在这个例子中,让我们保持逻辑简单,将这两个步骤明确表示。
import argparse import gym import torch.distributed.rpc as rpc parser = argparse.ArgumentParser( description="RPC Reinforcement Learning Example", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument('--world_size', default=2, type=int, metavar='W', help='number of workers') parser.add_argument('--log_interval', type=int, default=10, metavar='N', help='interval between training status logs') parser.add_argument('--gamma', type=float, default=0.99, metavar='G', help='how much to value future rewards') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed for reproducibility') args = parser.parse_args() class Observer: def __init__(self): self.id = rpc.get_worker_info().id self.env = gym.make('CartPole-v1') self.env.seed(args.seed) def run_episode(self, agent_rref): state, ep_reward = self.env.reset(), 0 for _ in range(10000): # send the state to the agent to get an action action = agent_rref.rpc_sync().select_action(self.id, state) # apply the action to the environment, and get the reward state, reward, done, _ = self.env.step(action) # report the reward to the agent for training purpose agent_rref.rpc_sync().report_reward(self.id, reward) # finishes after the number of self.env._max_episode_steps if done: break
代理的代码稍微复杂一些,我们将其分解为多个部分。在这个例子中,代理既充当训练者又充当主控,它向多个分布式观察者发送命令来运行剧集,并在本地记录所有动作和奖励,这些将在每个剧集后的训练阶段中使用。下面的代码显示了Agent
构造函数,其中大多数行都在初始化各种组件。最后的循环在其他工作进程上远程初始化观察者,并在本地保存这些观察者的RRefs
。代理将在稍后使用这些观察者的RRefs
来发送命令。应用程序不需要担心RRefs
的生命周期。每个RRef
的所有者维护一个引用计数映射来跟踪其生命周期,并保证只要有任何RRef
的活动用户,远程数据对象就不会被删除。有关详细信息,请参阅RRef
设计文档。
import gym import numpy as np import torch import torch.distributed.rpc as rpc import torch.optim as optim from torch.distributed.rpc import RRef, rpc_async, remote from torch.distributions import Categorical class Agent: def __init__(self, world_size): self.ob_rrefs = [] self.agent_rref = RRef(self) self.rewards = {} self.saved_log_probs = {} self.policy = Policy() self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2) self.eps = np.finfo(np.float32).eps.item() self.running_reward = 0 self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold for ob_rank in range(1, world_size): ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank)) self.ob_rrefs.append(remote(ob_info, Observer)) self.rewards[ob_info.id] = [] self.saved_log_probs[ob_info.id] = []
接下来,代理向观察者公开两个 API,用于选择动作和报告奖励。这些函数仅在代理上本地运行,但将通过 RPC 由观察者触发。
class Agent: ... def select_action(self, ob_id, state): state = torch.from_numpy(state).float().unsqueeze(0) probs = self.policy(state) m = Categorical(probs) action = m.sample() self.saved_log_probs[ob_id].append(m.log_prob(action)) return action.item() def report_reward(self, ob_id, reward): self.rewards[ob_id].append(reward)
让我们在代理上添加一个run_episode
函数,告诉所有观察者执行一个剧集。在这个函数中,首先创建一个列表来收集异步 RPC 的 futures,然后循环遍历所有观察者的RRefs
来进行异步 RPC。在这些 RPC 中,代理还将自身的RRef
传递给观察者,以便观察者也可以在代理上调用函数。如上所示,每个观察者将向代理发起 RPC,这些是嵌套的 RPC。每个剧集结束后,saved_log_probs
和rewards
将包含记录的动作概率和奖励。
class Agent: ... def run_episode(self): futs = [] for ob_rref in self.ob_rrefs: # make async RPC to kick off an episode on all observers futs.append( rpc_async( ob_rref.owner(), ob_rref.rpc_sync().run_episode, args=(self.agent_rref,) ) ) # wait until all obervers have finished this episode for fut in futs: fut.wait()
最后,在一个剧集结束后,代理需要训练模型,这在下面的finish_episode
函数中实现。这个函数中没有 RPC,它主要是从单线程的示例中借用的。因此,我们跳过描述其内容。
class Agent: ... def finish_episode(self): # joins probs and rewards from different observers into lists R, probs, rewards = 0, [], [] for ob_id in self.rewards: probs.extend(self.saved_log_probs[ob_id]) rewards.extend(self.rewards[ob_id]) # use the minimum observer reward to calculate the running reward min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards]) self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward # clear saved probs and rewards for ob_id in self.rewards: self.rewards[ob_id] = [] self.saved_log_probs[ob_id] = [] policy_loss, returns = [], [] for r in rewards[::-1]: R = r + args.gamma * R returns.insert(0, R) returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + self.eps) for log_prob, R in zip(probs, returns): policy_loss.append(-log_prob * R) self.optimizer.zero_grad() policy_loss = torch.cat(policy_loss).sum() policy_loss.backward() self.optimizer.step() return min_reward
有了Policy
、Observer
和Agent
类,我们准备启动多个进程执行分布式训练。在这个例子中,所有进程都运行相同的run_worker
函数,并使用排名来区分它们的角色。排名 0 始终是代理,所有其他排名都是观察者。代理通过反复调用run_episode
和finish_episode
来充当主控,直到运行奖励超过环境指定的奖励阈值。所有观察者都 passively 等待代理的命令。代码由rpc.init_rpc和rpc.shutdown包装,分别初始化和终止 RPC 实例。更多细节请参阅API 页面。
import os from itertools import count import torch.multiprocessing as mp AGENT_NAME = "agent" OBSERVER_NAME="obs{}" def run_worker(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' if rank == 0: # rank0 is the agent rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size) agent = Agent(world_size) print(f"This will run until reward threshold of {agent.reward_threshold}" " is reached. Ctrl+C to exit.") for i_episode in count(1): agent.run_episode() last_reward = agent.finish_episode() if i_episode % args.log_interval == 0: print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: " f"{agent.running_reward:.2f}") if agent.running_reward > agent.reward_threshold: print(f"Solved! Running reward is now {agent.running_reward}!") break else: # other ranks are the observer rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size) # observers passively waiting for instructions from the agent # block until all rpcs finish, and shutdown the RPC instance rpc.shutdown() mp.spawn( run_worker, args=(args.world_size, ), nprocs=args.world_size, join=True )
以下是在 world_size=2 时进行训练时的一些示例输出。
This will run until reward threshold of 475.0 is reached. Ctrl+C to exit. Episode 10 Last reward: 26.00 Average reward: 10.01 Episode 20 Last reward: 16.00 Average reward: 11.27 Episode 30 Last reward: 49.00 Average reward: 18.62 Episode 40 Last reward: 45.00 Average reward: 26.09 Episode 50 Last reward: 44.00 Average reward: 30.03 Episode 60 Last reward: 111.00 Average reward: 42.23 Episode 70 Last reward: 131.00 Average reward: 70.11 Episode 80 Last reward: 87.00 Average reward: 76.51 Episode 90 Last reward: 86.00 Average reward: 95.93 Episode 100 Last reward: 13.00 Average reward: 123.93 Episode 110 Last reward: 33.00 Average reward: 91.39 Episode 120 Last reward: 73.00 Average reward: 76.38 Episode 130 Last reward: 137.00 Average reward: 88.08 Episode 140 Last reward: 89.00 Average reward: 104.96 Episode 150 Last reward: 97.00 Average reward: 98.74 Episode 160 Last reward: 150.00 Average reward: 100.87 Episode 170 Last reward: 126.00 Average reward: 104.38 Episode 180 Last reward: 500.00 Average reward: 213.74 Episode 190 Last reward: 322.00 Average reward: 300.22 Episode 200 Last reward: 165.00 Average reward: 272.71 Episode 210 Last reward: 168.00 Average reward: 233.11 Episode 220 Last reward: 184.00 Average reward: 195.02 Episode 230 Last reward: 284.00 Average reward: 208.32 Episode 240 Last reward: 395.00 Average reward: 247.37 Episode 250 Last reward: 500.00 Average reward: 335.42 Episode 260 Last reward: 500.00 Average reward: 386.30 Episode 270 Last reward: 500.00 Average reward: 405.29 Episode 280 Last reward: 500.00 Average reward: 443.29 Episode 290 Last reward: 500.00 Average reward: 464.65 Solved! Running reward is now 475.3163778435275! • 31
在这个例子中,我们展示了如何使用 RPC 作为通信工具在工作器之间传递数据,以及如何使用 RRef 引用远程对象。当然,您可以直接在ProcessGroup
send
和recv
API 之上构建整个结构,或者使用其他通信/RPC 库。然而,通过使用 torch.distributed.rpc,您可以获得本地支持,并在幕后持续优化性能。
接下来,我们将展示如何结合 RPC 和 RRef 与分布式自动求导和分布式优化器来执行分布式模型并行训练。
使用分布式自动求导和分布式优化器的分布式 RNN
在本节中,我们使用一个 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。示例 RNN 模型非常小,可以轻松适应单个 GPU,但我们仍将其层分布到两个不同的工作器上以演示这个想法。开发人员可以应用类似的技术将更大的模型分布到多个设备和机器上。
RNN 模型设计借鉴了 PyTorch 示例 仓库中的单词语言模型,其中包含三个主要组件,一个嵌入表,一个LSTM
层和一个解码器。下面的代码将嵌入表和解码器包装成子模块,以便它们的构造函数可以传递给 RPC API。在EmbeddingTable
子模块中,我们故意将Embedding
层放在 GPU 上以涵盖使用情况。在 v1.4 中,RPC 始终在目标工作器上创建 CPU 张量参数或返回值。如果函数接受 GPU 张量,则需要显式将其移动到适当的设备上。
class EmbeddingTable(nn.Module): r""" Encoding layers of the RNNModel """ def __init__(self, ntoken, ninp, dropout): super(EmbeddingTable, self).__init__() self.drop = nn.Dropout(dropout) self.encoder = nn.Embedding(ntoken, ninp).cuda() self.encoder.weight.data.uniform_(-0.1, 0.1) def forward(self, input): return self.drop(self.encoder(input.cuda()).cpu() class Decoder(nn.Module): def __init__(self, ntoken, nhid, dropout): super(Decoder, self).__init__() self.drop = nn.Dropout(dropout) self.decoder = nn.Linear(nhid, ntoken) self.decoder.bias.data.zero_() self.decoder.weight.data.uniform_(-0.1, 0.1) def forward(self, output): return self.decoder(self.drop(output))
通过上述子模块,我们现在可以使用 RPC 将它们组合在一起创建一个 RNN 模型。在下面的代码中,ps
代表参数服务器,它承载嵌入表和解码器的参数。构造函数使用remote API 在参数服务器上创建一个EmbeddingTable
对象和一个Decoder
对象,并在本地创建LSTM
子模块。在前向传播过程中,训练器使用EmbeddingTable
的RRef
来找到远程子模块,并通过 RPC 将输入数据传递给EmbeddingTable
并获取查找结果。然后,它通过本地的LSTM
层运行嵌入,最后使用另一个 RPC 将输出发送到Decoder
子模块。通常,为了实现分布式模型并行训练,开发人员可以将模型划分为子模块,调用 RPC 远程创建子模块实例,并在必要时使用RRef
来找到它们。正如您在下面的代码中所看到的,它看起来非常类似于单机模型并行训练。主要区别是用 RPC 函数替换Tensor.to(device)
。
class RNNModel(nn.Module): def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5): super(RNNModel, self).__init__() # setup embedding table remotely self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout)) # setup LSTM locally self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout) # setup decoder remotely self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout)) def forward(self, input, hidden): # pass input to the remote embedding table and fetch emb tensor back emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input) output, hidden = self.rnn(emb, hidden) # pass output to the rremote decoder and get the decoded output back decoded = _remote_method(Decoder.forward, self.decoder_rref, output) return decoded, hidden
在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的 RRef 列表,这将被分布式优化器使用。在本地训练中,应用程序可以调用Module.parameters()
来获取所有参数张量的引用,并将其传递给本地优化器进行后续更新。然而,在分布式训练场景中,相同的 API 不起作用,因为一些参数存在于远程机器上。因此,分布式优化器不是接受参数Tensors
列表,而是接受RRefs
列表,每个模型参数都有一个RRef
,用于本地和远程模型参数。辅助函数非常简单,只需调用Module.parameters()
并在每个参数上创建一个本地RRef
。
def _parameter_rrefs(module): param_rrefs = [] for param in module.parameters(): param_rrefs.append(RRef(param)) return param_rrefs
然后,由于RNNModel
包含三个子模块,我们需要三次调用_parameter_rrefs
,并将其包装到另一个辅助函数中。
class RNNModel(nn.Module): ... def parameter_rrefs(self): remote_params = [] # get RRefs of embedding table remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref)) # create RRefs for local parameters remote_params.extend(_parameter_rrefs(self.rnn)) # get RRefs of decoder remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref)) return remote_params
现在,我们准备实现训练循环。在初始化模型参数后,我们创建RNNModel
和DistributedOptimizer
。分布式优化器将获取参数RRefs
列表,找到所有不同的所有者工作节点,并使用给定参数(即,在本例中为lr=0.05
)在每个所有者工作节点上创建给定的本地优化器(即SGD
,您也可以使用其他本地优化器)。
在训练循环中,首先创建一个分布式自动求导上下文,这将帮助分布式自动求导引擎找到梯度和涉及的 RPC 发送/接收函数。分布式自动求导引擎的设计细节可以在其设计说明中找到。然后,启动前向传播,就像是一个本地模型,然后运行分布式反向传播。对于分布式反向传播,您只需要指定一个根列表,在本例中,它是损失Tensor
。分布式自动求导引擎将自动遍历分布式图并正确写入梯度。接下来,在分布式优化器上运行step
函数,这将联系到所有涉及的本地优化器来更新模型参数。与本地训练相比,一个小的区别是您不需要运行zero_grad()
,因为每个自动求导上下文都有专用空间来存储梯度,并且由于我们每次迭代创建一个上下文,来自不同迭代的梯度不会累积到相同的Tensors
集合中。
def run_trainer(): batch = 5 ntoken = 10 ninp = 2 nhid = 3 nindices = 3 nlayers = 4 hidden = ( torch.randn(nlayers, nindices, nhid), torch.randn(nlayers, nindices, nhid) ) model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers) # setup distributed optimizer opt = DistributedOptimizer( optim.SGD, model.parameter_rrefs(), lr=0.05, ) criterion = torch.nn.CrossEntropyLoss() def get_next_batch(): for _ in range(5): data = torch.LongTensor(batch, nindices) % ntoken target = torch.LongTensor(batch, ntoken) % nindices yield data, target # train for 10 iterations for epoch in range(10): for data, target in get_next_batch(): # create distributed autograd context with dist_autograd.context() as context_id: hidden[0].detach_() hidden[1].detach_() output, hidden = model(data, hidden) loss = criterion(output, target) # run distributed backward pass dist_autograd.backward(context_id, [loss]) # run distributed optimizer opt.step(context_id) # not necessary to zero grads since they are # accumulated into the distributed autograd context # which is reset every iteration. print("Training epoch {}".format(epoch))
最后,让我们添加一些粘合代码来启动参数服务器和训练器进程。
def run_worker(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' if rank == 1: rpc.init_rpc("trainer", rank=rank, world_size=world_size) _run_trainer() else: rpc.init_rpc("ps", rank=rank, world_size=world_size) # parameter server do nothing pass # block until all rpcs finish rpc.shutdown() if __name__=="__main__": world_size = 2 mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
PyTorch 2.2 中文官方教程(十八)(4)https://developer.aliyun.com/article/1482610