PyTorch 2.2 中文官方教程(十八)(3)

简介: PyTorch 2.2 中文官方教程(十八)

PyTorch 2.2 中文官方教程(十八)(2)https://developer.aliyun.com/article/1482606

使用分布式 RPC 框架入门

原文:pytorch.org/tutorials/intermediate/rpc_tutorial.html

译者:飞龙

协议:CC BY-NC-SA 4.0

作者Shen Li

注意

github中查看和编辑本教程。

先决条件:

本教程使用两个简单示例演示如何使用torch.distributed.rpc包构建分布式训练,该包最初作为 PyTorch v1.4 中的实验性功能引入。这两个示例的源代码可以在PyTorch 示例中找到。

之前的教程,使用分布式数据并行开始和使用 PyTorch 编写分布式应用程序,描述了分布式数据并行,支持一种特定的训练范式,其中模型在多个进程中复制,并且每个进程处理输入数据的一个部分。有时,您可能会遇到需要不同训练范式的情况。例如:

  1. 在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能非常小。在这种情况下,可能有用的是并行运行多个观察者并共享单个代理。在这种情况下,代理在本地处理训练,但应用程序仍需要库来在观察者和训练者之间发送和接收数据。
  2. 您的模型可能太大,无法适应单台机器上的 GPU,因此需要一个库来将模型分割到多台机器上。或者您可能正在实现一个参数服务器训练框架,其中模型参数和训练器位于不同的机器上。

torch.distributed.rpc包可以帮助处理上述情况。在情况 1 中,RPCRRef允许从一个工作进程发送数据到另一个工作进程,同时轻松引用远程数据对象。在情况 2 中,分布式自动求导分布式优化器使得执行反向传播和优化器步骤就像是本地训练一样。在接下来的两个部分中,我们将使用一个强化学习示例和一个语言模型示例演示torch.distributed.rpc的 API。请注意,本教程的目标不是构建最准确或高效的模型来解决给定问题,而是展示如何使用torch.distributed.rpc包构建分布式训练应用程序。

使用 RPC 和 RRef 进行分布式强化学习

本部分描述了使用 RPC 构建玩具分布式强化学习模型的步骤,以解决来自OpenAI Gym的 CartPole-v1 问题。策略代码大部分是从现有的单线程示例中借用的,如下所示。我们将跳过“策略”设计的细节,重点放在 RPC 的用法上。

import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1) 

我们准备展示观察者。在这个例子中,每个观察者都创建自己的环境,并等待代理的命令来运行一个剧集。在每个剧集中,一个观察者最多循环n_steps次迭代,在每次迭代中,它使用 RPC 将其环境状态传递给代理,并获得一个动作。然后将该动作应用于其环境,并从环境中获得奖励和下一个状态。之后,观察者使用另一个 RPC 向代理报告奖励。再次请注意,这显然不是最有效的观察者实现。例如,一个简单的优化可以是将当前状态和上一个奖励打包在一个 RPC 中,以减少通信开销。然而,目标是演示 RPC API 而不是构建 CartPole 的最佳求解器。因此,在这个例子中,让我们保持逻辑简单,将这两个步骤明确表示。

import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, type=int, metavar='W',
                    help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
                    help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed  for reproducibility')
args = parser.parse_args()
class Observer:
    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset(), 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)
            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)
            # finishes after the number of self.env._max_episode_steps
            if done:
                break 

代理的代码稍微复杂一些,我们将其分解为多个部分。在这个例子中,代理既充当训练者又充当主控,它向多个分布式观察者发送命令来运行剧集,并在本地记录所有动作和奖励,这些将在每个剧集后的训练阶段中使用。下面的代码显示了Agent构造函数,其中大多数行都在初始化各种组件。最后的循环在其他工作进程上远程初始化观察者,并在本地保存这些观察者的RRefs。代理将在稍后使用这些观察者的RRefs来发送命令。应用程序不需要担心RRefs的生命周期。每个RRef的所有者维护一个引用计数映射来跟踪其生命周期,并保证只要有任何RRef的活动用户,远程数据对象就不会被删除。有关详细信息,请参阅RRef设计文档

import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = [] 

接下来,代理向观察者公开两个 API,用于选择动作和报告奖励。这些函数仅在代理上本地运行,但将通过 RPC 由观察者触发。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()
    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward) 

让我们在代理上添加一个run_episode函数,告诉所有观察者执行一个剧集。在这个函数中,首先创建一个列表来收集异步 RPC 的 futures,然后循环遍历所有观察者的RRefs来进行异步 RPC。在这些 RPC 中,代理还将自身的RRef传递给观察者,以便观察者也可以在代理上调用函数。如上所示,每个观察者将向代理发起 RPC,这些是嵌套的 RPC。每个剧集结束后,saved_log_probsrewards将包含记录的动作概率和奖励。

class Agent:
    ...
    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )
        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait() 

最后,在一个剧集结束后,代理需要训练模型,这在下面的finish_episode函数中实现。这个函数中没有 RPC,它主要是从单线程的示例中借用的。因此,我们跳过描述其内容。

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])
      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []
      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward 

有了PolicyObserverAgent类,我们准备启动多个进程执行分布式训练。在这个例子中,所有进程都运行相同的run_worker函数,并使用排名来区分它们的角色。排名 0 始终是代理,所有其他排名都是观察者。代理通过反复调用run_episodefinish_episode来充当主控,直到运行奖励超过环境指定的奖励阈值。所有观察者都 passively 等待代理的命令。代码由rpc.init_rpcrpc.shutdown包装,分别初始化和终止 RPC 实例。更多细节请参阅API 页面

import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"
def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
        agent = Agent(world_size)
        print(f"This will run until reward threshold of {agent.reward_threshold}"
                " is reached. Ctrl+C to exit.")
        for i_episode in count(1):
            agent.run_episode()
            last_reward = agent.finish_episode()
            if i_episode % args.log_interval == 0:
                print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
                    f"{agent.running_reward:.2f}")
            if agent.running_reward > agent.reward_threshold:
                print(f"Solved! Running reward is now {agent.running_reward}!")
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent
    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()
mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
) 

以下是在 world_size=2 时进行训练时的一些示例输出。

This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275! 
• 31

在这个例子中,我们展示了如何使用 RPC 作为通信工具在工作器之间传递数据,以及如何使用 RRef 引用远程对象。当然,您可以直接在ProcessGroup sendrecv API 之上构建整个结构,或者使用其他通信/RPC 库。然而,通过使用 torch.distributed.rpc,您可以获得本地支持,并在幕后持续优化性能。

接下来,我们将展示如何结合 RPC 和 RRef 与分布式自动求导和分布式优化器来执行分布式模型并行训练。

使用分布式自动求导和分布式优化器的分布式 RNN

在本节中,我们使用一个 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。示例 RNN 模型非常小,可以轻松适应单个 GPU,但我们仍将其层分布到两个不同的工作器上以演示这个想法。开发人员可以应用类似的技术将更大的模型分布到多个设备和机器上。

RNN 模型设计借鉴了 PyTorch 示例 仓库中的单词语言模型,其中包含三个主要组件,一个嵌入表,一个LSTM层和一个解码器。下面的代码将嵌入表和解码器包装成子模块,以便它们的构造函数可以传递给 RPC API。在EmbeddingTable子模块中,我们故意将Embedding层放在 GPU 上以涵盖使用情况。在 v1.4 中,RPC 始终在目标工作器上创建 CPU 张量参数或返回值。如果函数接受 GPU 张量,则需要显式将其移动到适当的设备上。

class EmbeddingTable(nn.Module):
  r"""
 Encoding layers of the RNNModel
 """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)
    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)
    def forward(self, output):
        return self.decoder(self.drop(output)) 

通过上述子模块,我们现在可以使用 RPC 将它们组合在一起创建一个 RNN 模型。在下面的代码中,ps代表参数服务器,它承载嵌入表和解码器的参数。构造函数使用remote API 在参数服务器上创建一个EmbeddingTable对象和一个Decoder对象,并在本地创建LSTM子模块。在前向传播过程中,训练器使用EmbeddingTableRRef来找到远程子模块,并通过 RPC 将输入数据传递给EmbeddingTable并获取查找结果。然后,它通过本地的LSTM层运行嵌入,最后使用另一个 RPC 将输出发送到Decoder子模块。通常,为了实现分布式模型并行训练,开发人员可以将模型划分为子模块,调用 RPC 远程创建子模块实例,并在必要时使用RRef来找到它们。正如您在下面的代码中所看到的,它看起来非常类似于单机模型并行训练。主要区别是用 RPC 函数替换Tensor.to(device)

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()
        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden 

在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的 RRef 列表,这将被分布式优化器使用。在本地训练中,应用程序可以调用Module.parameters()来获取所有参数张量的引用,并将其传递给本地优化器进行后续更新。然而,在分布式训练场景中,相同的 API 不起作用,因为一些参数存在于远程机器上。因此,分布式优化器不是接受参数Tensors列表,而是接受RRefs列表,每个模型参数都有一个RRef,用于本地和远程模型参数。辅助函数非常简单,只需调用Module.parameters()并在每个参数上创建一个本地RRef

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs 

然后,由于RNNModel包含三个子模块,我们需要三次调用_parameter_rrefs,并将其包装到另一个辅助函数中。

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params 

现在,我们准备实现训练循环。在初始化模型参数后,我们创建RNNModelDistributedOptimizer。分布式优化器将获取参数RRefs列表,找到所有不同的所有者工作节点,并使用给定参数(即,在本例中为lr=0.05)在每个所有者工作节点上创建给定的本地优化器(即SGD,您也可以使用其他本地优化器)。

在训练循环中,首先创建一个分布式自动求导上下文,这将帮助分布式自动求导引擎找到梯度和涉及的 RPC 发送/接收函数。分布式自动求导引擎的设计细节可以在其设计说明中找到。然后,启动前向传播,就像是一个本地模型,然后运行分布式反向传播。对于分布式反向传播,您只需要指定一个根列表,在本例中,它是损失Tensor。分布式自动求导引擎将自动遍历分布式图并正确写入梯度。接下来,在分布式优化器上运行step函数,这将联系到所有涉及的本地优化器来更新模型参数。与本地训练相比,一个小的区别是您不需要运行zero_grad(),因为每个自动求导上下文都有专用空间来存储梯度,并且由于我们每次迭代创建一个上下文,来自不同迭代的梯度不会累积到相同的Tensors集合中。

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2
    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )
    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )
    criterion = torch.nn.CrossEntropyLoss()
    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target
    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch)) 

最后,让我们添加一些粘合代码来启动参数服务器和训练器进程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass
    # block until all rpcs finish
    rpc.shutdown()
if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True) 

PyTorch 2.2 中文官方教程(十八)(4)https://developer.aliyun.com/article/1482610

相关实践学习
基于阿里云DeepGPU实例,用AI画唯美国风少女
本实验基于阿里云DeepGPU实例,使用aiacctorch加速stable-diffusion-webui,用AI画唯美国风少女,可提升性能至高至原性能的2.6倍。
相关文章
|
15天前
|
Android开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(2)
PyTorch 2.2 中文官方教程(二十)
42 0
PyTorch 2.2 中文官方教程(二十)(2)
|
15天前
|
iOS开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(1)
PyTorch 2.2 中文官方教程(二十)
45 0
PyTorch 2.2 中文官方教程(二十)(1)
|
PyTorch 算法框架/工具 并行计算
PyTorch 2.2 中文官方教程(十九)(4)
PyTorch 2.2 中文官方教程(十九)
27 0
|
15天前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十八)(4)
PyTorch 2.2 中文官方教程(十八)
53 1
|
15天前
|
并行计算 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十七)(4)
PyTorch 2.2 中文官方教程(十七)
25 2
PyTorch 2.2 中文官方教程(十七)(4)
|
15天前
|
PyTorch 算法框架/工具 机器学习/深度学习
PyTorch 2.2 中文官方教程(十七)(2)
PyTorch 2.2 中文官方教程(十七)
38 1
PyTorch 2.2 中文官方教程(十七)(2)
|
15天前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十五)(1)
PyTorch 2.2 中文官方教程(十五)
46 1
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十四)(4)
PyTorch 2.2 中文官方教程(十四)
63 1
PyTorch 2.2 中文官方教程(十四)(4)
|
15天前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十四)(2)
PyTorch 2.2 中文官方教程(十四)
49 1
|
3月前
|
机器学习/深度学习 编解码 PyTorch
Pytorch实现手写数字识别 | MNIST数据集(CNN卷积神经网络)
Pytorch实现手写数字识别 | MNIST数据集(CNN卷积神经网络)