PyTorch 2.2 中文官方教程(十九)(2)

本文涉及的产品
函数计算FC,每月15万CU 3个月
简介: PyTorch 2.2 中文官方教程(十九)

PyTorch 2.2 中文官方教程(十九)(1)https://developer.aliyun.com/article/1482611

批处理 CartPole 求解器

本节使用OpenAI Gym中的 CartPole-v1 作为示例,展示批处理 RPC 的性能影响。请注意,由于目标是演示@rpc.functions.async_execution的用法,而不是构建最佳 CartPole 求解器或解决最多不同的 RL 问题,我们使用非常简单的策略和奖励计算策略,并专注于多观察者单代理批处理 RPC 实现。我们使用与之前教程相似的Policy模型,如下所示。与之前的教程相比,不同之处在于其构造函数接受一个额外的batch参数,该参数控制F.softmaxdim参数,因为在批处理中,forward函数中的x参数包含来自多个观察者的状态,因此维度需要适当更改。其他一切保持不变。

import argparse
import torch.nn as nn
import torch.nn.functional as F
parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()
torch.manual_seed(args.seed)
class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1
    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim) 

Observer的构造函数也相应地进行调整。它还接受一个batch参数,用于控制它使用哪个Agent函数来选择动作。在批处理模式下,它调用Agent上的select_action_batch函数,该函数将很快被介绍,并且此函数将被装饰为@rpc.functions.async_execution

import gym
import torch.distributed.rpc as rpc
class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action 

与之前的教程使用分布式 RPC 框架入门相比,观察者的行为有些不同。它不是在环境停止时退出,而是在每个情节中始终运行n_steps次迭代。当环境返回时,观察者简单地重置环境并重新开始。通过这种设计,代理将从每个观察者接收固定数量的状态,因此可以将它们打包到固定大小的张量中。在每一步中,Observer使用 RPC 将其状态发送给Agent,并通过返回值获取动作。在每个情节结束时,它将所有步骤的奖励返回给Agent。请注意,run_episode函数将由Agent使用 RPC 调用。因此,此函数中的rpc_sync调用将是一个嵌套的 RPC 调用。我们也可以将此函数标记为@rpc.functions.async_execution,以避免在Observer上阻塞一个线程。然而,由于瓶颈是Agent而不是Observer,在Observer进程上阻塞一个线程应该是可以接受的。

import torch
class Observer:
    ...
    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )
            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward
            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1
        return [rewards, ep_reward] 

Agent的构造函数还接受一个batch参数,用于控制如何对动作概率进行批处理。在批处理模式下,saved_log_probs包含一个张量列表,其中每个张量包含一个步骤中所有观察者的动作概率。没有批处理时,saved_log_probs是一个字典,其中键是观察者 ID,值是该观察者的动作概率列表。

import threading
from torch.distributed.rpc import RRef
class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs) 

非批处理的select_action简单地通过策略运行状态,保存动作概率,并立即将动作返回给观察者。

from torch.distributions import Categorical
class Agent:
    ...
    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item() 

通过批处理,状态存储在一个二维张量self.states中,使用观察者 ID 作为行 ID。然后,它通过安装回调函数到批处理生成的self.future_actions Future对象来链接一个Future,该对象将用特定行索引填充,使用观察者的 ID。最后到达的观察者将所有批处理状态一次性通过策略运行,并相应地设置self.future_actions。当这发生时,所有安装在self.future_actions上的回调函数将被触发,它们的返回值将用于填充链接的Future对象,进而通知Agent为其他观察者之前的所有 RPC 请求准备和通信响应。

class Agent:
    ...
    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )
        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action 

现在让我们定义不同的 RPC 函数如何被串联在一起。Agent 控制每一集的执行。它首先使用 rpc_async 在所有观察者上启动集,并阻塞在返回的 futures 上,这些 futures 将填充观察者奖励。请注意,下面的代码使用 RRef 辅助函数 ob_rref.rpc_async() 在拥有 ob_rref RRef 的所有者上启动 run_episode 函数,并提供参数。然后将保存的动作概率和返回的观察者奖励转换为预期的数据格式,并启动训练步骤。最后,它重置所有状态并返回当前集的奖励。这个函数是运行一个集的入口点。

class Agent:
    ...
    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))
        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)
        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)
        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()
        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward 

代码的其余部分是正常的进程启动和日志记录,与其他 RPC 教程类似。在本教程中,所有观察者都 passively 等待来自 agent 的命令。请参考 examples 仓库获取完整的实现。

def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)
            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()
def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)
        print(f"{world_size}, {delays[0]}, {delays[1]}")
if __name__ == '__main__':
    main() 

批处理 RPC 有助于将动作推断整合为更少的 CUDA 操作,从而减少摊销开销。上面的 main 函数在批处理和非批处理模式下运行相同的代码,使用不同数量的观察者,范围从 1 到 10。下面的图表显示了使用默认参数值时不同世界大小的执行时间。结果证实了我们的预期,批处理有助于加快训练速度。

了解更多

将分布式 DataParallel 与分布式 RPC 框架结合起来

原文:pytorch.org/tutorials/advanced/rpc_ddp_tutorial.html

译者:飞龙

协议:CC BY-NC-SA 4.0

作者Pritam DamaniaYi Wang

注意

github中查看和编辑本教程。

本教程使用一个简单的示例来演示如何将DistributedDataParallel(DDP)与Distributed RPC framework结合起来,以将分布式数据并行与分布式模型并行结合起来训练一个简单的模型。示例的源代码可以在这里找到。

之前的教程,Getting Started With Distributed Data ParallelGetting Started with Distributed RPC Framework,分别描述了如何执行分布式数据并行和分布式模型并行训练。尽管如此,有几种训练范式可能需要结合这两种技术。例如:

  1. 如果我们的模型有一个稀疏部分(大型嵌入表)和一个稠密部分(FC 层),我们可能希望将嵌入表放在参数服务器上,并使用DistributedDataParallel将 FC 层复制到多个训练器上。Distributed RPC framework可用于在参数服务器上执行嵌入查找。
  2. 启用混合并行,如PipeDream论文中所述。我们可以使用Distributed RPC framework将模型的阶段在多个工作节点上进行流水线处理,并使用DistributedDataParallel复制每个阶段(如果需要)。

在本教程中,我们将涵盖上述第 1 种情况。在我们的设置中,总共有 4 个工作节点:

  1. 1 个主节点,负责在参数服务器上创建一个嵌入表(nn.EmbeddingBag)。主节点还驱动两个训练器的训练循环。
  2. 1 个参数服务器,基本上在内存中保存嵌入表,并响应来自主节点和训练器的 RPC。
  3. 2 个训练器,它们存储一个在它们之间复制的 FC 层(nn.Linear),使用DistributedDataParallel。这些训练器还负责执行前向传播、反向传播和优化器步骤。

整个训练过程如下执行:

  1. 主节点创建一个RemoteModule,在参数服务器上保存一个嵌入表。
  2. 然后主节点启动训练循环,并将远程模块传递给训练器。
  3. 训练器创建一个HybridModel,首先使用主节点提供的远程模块进行嵌入查找,然后执行包含在 DDP 中的 FC 层。
  4. 训练器执行模型的前向传播,并使用损失执行反向传播,使用Distributed Autograd
  5. 在反向传播的过程中,首先计算 FC 层的梯度,然后通过 DDP 中的 allreduce 同步到所有训练器。
  6. 接下来,Distributed Autograd 将梯度传播到参数服务器,更新嵌入表的梯度。
  7. 最后,使用Distributed Optimizer来更新所有参数。

注意

如果结合 DDP 和 RPC,应始终使用Distributed Autograd进行反向传播。

现在,让我们逐个详细介绍每个部分。首先,我们需要在进行任何训练之前设置所有的 worker。我们创建 4 个进程,其中 rank 0 和 1 是我们的 Trainer,rank 2 是主节点,rank 3 是参数服务器。

我们使用 TCP init_method 在所有 4 个 worker 上初始化 RPC 框架。一旦 RPC 初始化完成,主节点会创建一个远程模块,该模块在参数服务器上保存了一个EmbeddingBag层,使用RemoteModule。然后主节点循环遍历每个 Trainer,并通过调用rpc_async在每个 Trainer 上调用_run_trainer来启动训练循环。最后,主节点在退出之前等待所有训练完成。

Trainer 首先为 DDP 初始化一个 world_size=2(两个 Trainer)的ProcessGroup,使用init_process_group。接下来,他们使用 TCP init_method 初始化 RPC 框架。请注意,RPC 初始化和 ProcessGroup 初始化中的端口是不同的。这是为了避免两个框架初始化之间的端口冲突。初始化完成后,Trainer 只需等待来自主节点的_run_trainer RPC。

参数服务器只是初始化 RPC 框架并等待来自 Trainer 和主节点的 RPC。

def run_worker(rank, world_size):
  r"""
 A wrapper function that initializes RPC, calls the function, and shuts down
 RPC.
 """
    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"
    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )
        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)
        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )
        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass
    # block until all rpcs finish
    rpc.shutdown()
if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True) 

在讨论 Trainer 的细节之前,让我们先介绍一下 Trainer 使用的HybridModel。如下所述,HybridModel是使用一个远程模块进行初始化的,该远程模块在参数服务器上保存了一个嵌入表(remote_emb_module)和用于 DDP 的device。模型的初始化将一个nn.Linear层包装在 DDP 中,以便在所有 Trainer 之间复制和同步这个层。

模型的前向方法非常简单。它使用 RemoteModule 的forward在参数服务器上进行嵌入查找,并将其输出传递给 FC 层。

class HybridModel(torch.nn.Module):
  r"""
 The model consists of a sparse part and a dense part.
 1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
 2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
 This remote model can get a Remote Reference to the embedding table on the parameter server.
 """
    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device
    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device)) 

接下来,让我们看一下 Trainer 的设置。Trainer 首先使用一个远程模块创建上述HybridModel,该远程模块在参数服务器上保存了嵌入表和自己的 rank。

现在,我们需要获取一个 RRefs 列表,其中包含我们想要使用DistributedOptimizer进行优化的所有参数。为了从参数服务器检索嵌入表的参数,我们可以调用 RemoteModule 的remote_parameters,这个方法基本上遍历了嵌入表的所有参数,并返回一个 RRefs 列表。Trainer 通过 RPC 在参数服务器上调用这个方法,以接收到所需参数的 RRefs 列表。由于 DistributedOptimizer 始终需要一个要优化的参数的 RRefs 列表,我们需要为 FC 层的本地参数创建 RRefs。这是通过遍历model.fc.parameters(),为每个参数创建一个 RRef,并将其附加到从remote_parameters()返回的列表中完成的。请注意,我们不能使用model.parameters(),因为它会递归调用model.remote_emb_module.parameters(),这是RemoteModule不支持的。

最后,我们使用所有的 RRefs 创建我们的 DistributedOptimizer,并定义一个 CrossEntropyLoss 函数。

def _run_trainer(remote_emb_module, rank):
  r"""
 Each trainer runs a forward pass which involves an embedding lookup on the
 parameter server and running nn.Linear locally. During the backward pass,
 DDP is responsible for aggregating the gradients for the dense part
 (nn.Linear) and distributed autograd ensures gradients updates are
 propagated to the parameter server.
 """
    # Setup the model.
    model = HybridModel(remote_emb_module, rank)
    # Retrieve all model parameters as rrefs for DistributedOptimizer.
    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()
    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))
    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )
    criterion = torch.nn.CrossEntropyLoss() 

现在我们准备介绍在每个训练器上运行的主要训练循环。get_next_batch只是一个辅助函数,用于生成训练的随机输入和目标。我们对多个 epochs 和每个 batch 运行训练循环:

  1. 为分布式自动求导设置Distributed Autograd Context
  2. 运行模型的前向传播并检索其输出。
  3. 使用损失函数基于我们的输出和目标计算损失。
  4. 使用分布式自动求导来执行使用损失函数的分布式反向传播。
  5. 最后,运行一个分布式优化器步骤来优化所有参数。
def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1
            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target
    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)
                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # Tun distributed optimizer
                opt.step(context_id)
                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch)) 

整个示例的源代码可以在这里找到。

使用管道并行性训练 Transformer 模型

原文:pytorch.org/tutorials/intermediate/pipeline_tutorial.html

译者:飞龙

协议:CC BY-NC-SA 4.0

注意

点击这里下载完整示例代码

作者Pritam Damania

本教程演示了如何使用管道并行性在多个 GPU 上训练大型 Transformer 模型。本教程是使用 nn.Transformer 和 TorchText 进行序列到序列建模教程的延伸,并扩展了相同的模型,以演示如何使用管道并行性来训练 Transformer 模型。

先决条件:

定义模型

在本教程中,我们将把一个 Transformer 模型分成两个 GPU,并使用管道并行性来训练模型。该模型与使用 nn.Transformer 和 TorchText 进行序列到序列建模教程中使用的模型完全相同,但被分成两个阶段。最大数量的参数属于nn.TransformerEncoder层。nn.TransformerEncoder本身由nlayersnn.TransformerEncoderLayer组成。因此,我们的重点是nn.TransformerEncoder,我们将模型分成一半的nn.TransformerEncoderLayer在一个 GPU 上,另一半在另一个 GPU 上。为此,我们将EncoderDecoder部分提取到单独的模块中,然后构建一个代表原始 Transformer 模块的nn.Sequential

import sys
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import tempfile
from torch.nn import TransformerEncoder, TransformerEncoderLayer
if sys.platform == 'win32':
    print('Windows platform is not supported for pipeline parallelism')
    sys.exit(0)
if torch.cuda.device_count() < 2:
    print('Need at least two GPU devices for this tutorial')
    sys.exit(0)
class Encoder(nn.Module):
    def __init__(self, ntoken, ninp, dropout=0.5):
        super(Encoder, self).__init__()
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.init_weights()
    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
    def forward(self, src):
        # Need (S, N) format for encoder.
        src = src.t()
        src = self.encoder(src) * math.sqrt(self.ninp)
        return self.pos_encoder(src)
class Decoder(nn.Module):
    def __init__(self, ntoken, ninp):
        super(Decoder, self).__init__()
        self.decoder = nn.Linear(ninp, ntoken)
        self.init_weights()
    def init_weights(self):
        initrange = 0.1
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)
    def forward(self, inp):
        # Need batch dimension first for output of pipeline.
        return self.decoder(inp).permute(1, 0, 2) 

PositionalEncoding模块注入了关于序列中标记的相对或绝对位置的一些信息。位置编码与嵌入具有相同的维度,因此可以将两者相加。在这里,我们使用不同频率的sinecosine函数。

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x) 

加载和批处理数据

训练过程使用了来自torchtext的 Wikitext-2 数据集。要访问 torchtext 数据集,请按照github.com/pytorch/data上的说明安装 torchdata。

vocab 对象是基于训练数据集构建的,并用于将标记数值化为张量。从顺序数据开始,batchify()函数将数据集排列成列,将数据分成大小为batch_size的批次后,修剪掉任何剩余的标记。例如,以字母表作为序列(总长度为 26)和批次大小为 4,我们将字母表分成长度为 6 的 4 个序列:


image.png

image.png

模型将这些列视为独立的,这意味着无法学习GF之间的依赖关系,但可以实现更高效的批处理。

import torch
from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
train_iter = WikiText2(split='train')
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
def data_process(raw_text_iter):
  data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
  return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))
train_iter, val_iter, test_iter = WikiText2()
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)
device = torch.device("cuda")
def batchify(data, bsz):
    # Divide the dataset into ``bsz`` parts.
    nbatch = data.size(0) // bsz
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * bsz)
    # Evenly divide the data across the ``bsz` batches.
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)
batch_size = 20
eval_batch_size = 10
train_data = batchify(train_data, batch_size)
val_data = batchify(val_data, eval_batch_size)
test_data = batchify(test_data, eval_batch_size) 

生成输入和目标序列的函数

get_batch()函数为 transformer 模型生成输入和目标序列。它将源数据细分为长度为bptt的块。对于语言建模任务,模型需要以下单词作为Target。例如,对于bptt值为 2,我们会得到i = 0 时的以下两个变量:

应该注意到,块沿着维度 0,与 Transformer 模型中的S维度一致。批量维度N沿着维度 1。

bptt = 25
def get_batch(source, i):
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].view(-1)
    # Need batch dimension first for pipeline parallelism.
    return data.t(), target 

模型规模和 Pipe 初始化

为了展示使用管道并行性训练大型 Transformer 模型,我们适当地扩展了 Transformer 层。我们使用了 4096 的嵌入维度,4096 的隐藏大小,16 个注意力头和 12 个总的 Transformer 层(nn.TransformerEncoderLayer)。这创建了一个拥有**~14 亿**参数的模型。

我们需要初始化RPC 框架,因为 Pipe 依赖于 RPC 框架通过RRef进行跨主机流水线扩展。我们需要仅使用单个 worker 初始化 RPC 框架,因为我们使用单个进程来驱动多个 GPU。

然后,在一个 GPU 上初始化 8 个 transformer 层,并在另一个 GPU 上初始化 8 个 transformer 层。

注意

为了提高效率,我们确保传递给Pipenn.Sequential只包含两个元素(对应两个 GPU),这允许 Pipe 仅使用两个分区并避免任何跨分区的开销。

ntokens = len(vocab) # the size of vocabulary
emsize = 4096 # embedding dimension
nhid = 4096 # the dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 12 # the number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 16 # the number of heads in the Multihead Attention models
dropout = 0.2 # the dropout value
from torch.distributed import rpc
tmpfile = tempfile.NamedTemporaryFile()
rpc.init_rpc(
    name="worker",
    rank=0,
    world_size=1,
    rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
        init_method="file://{}".format(tmpfile.name),
        # Specifying _transports and _channels is a workaround and we no longer
        # will have to specify _transports and _channels for PyTorch
        # versions >= 1.8.1
        _transports=["ibv", "uv"],
        _channels=["cuda_ipc", "cuda_basic"],
    )
)
num_gpus = 2
partition_len = ((nlayers - 1) // num_gpus) + 1
# Add encoder in the beginning.
tmp_list = [Encoder(ntokens, emsize, dropout).cuda(0)]
module_list = []
# Add all the necessary transformer blocks.
for i in range(nlayers):
    transformer_block = TransformerEncoderLayer(emsize, nhead, nhid, dropout)
    if i != 0 and i % (partition_len) == 0:
        module_list.append(nn.Sequential(*tmp_list))
        tmp_list = []
    device = i // (partition_len)
    tmp_list.append(transformer_block.to(device))
# Add decoder in the end.
tmp_list.append(Decoder(ntokens, emsize).cuda(num_gpus - 1))
module_list.append(nn.Sequential(*tmp_list))
from torch.distributed.pipeline.sync import Pipe
# Build the pipeline.
chunks = 8
model = Pipe(torch.nn.Sequential(*module_list), chunks = chunks)
def get_total_params(module: torch.nn.Module):
    total_params = 0
    for param in module.parameters():
        total_params += param.numel()
    return total_params
print ('Total parameters in model: {:,}'.format(get_total_params(model))) 
Total parameters in model: 1,444,261,998 

运行模型

CrossEntropyLoss用于跟踪损失,SGD实现随机梯度下降方法作为优化器。初始学习率设置为 5.0。StepLR用于通过 epoch 调整学习率。在训练期间,我们使用nn.utils.clip_grad_norm_函数将所有梯度一起缩放,以防止梯度爆炸。

criterion = nn.CrossEntropyLoss()
lr = 5.0 # learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
import time
def train():
    model.train() # Turn on the train mode
    total_loss = 0.
    start_time = time.time()
    ntokens = len(vocab)
    # Train only for 50 batches to keep script execution time low.
    nbatches = min(50 * bptt, train_data.size(0) - 1)
    for batch, i in enumerate(range(0, nbatches, bptt)):
        data, targets = get_batch(train_data, i)
        optimizer.zero_grad()
        # Since the Pipe is only within a single host and process the ``RRef``
        # returned by forward method is local to this node and can simply
        # retrieved via ``RRef.local_value()``.
        output = model(data).local_value()
        # Need to move targets to the device where the output of the
        # pipeline resides.
        loss = criterion(output.view(-1, ntokens), targets.cuda(1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        total_loss += loss.item()
        log_interval = 10
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | '
                  'lr {:02.2f} | ms/batch {:5.2f} | '
                  'loss {:5.2f} | ppl {:8.2f}'.format(
                    epoch, batch, nbatches // bptt, scheduler.get_lr()[0],
                    elapsed * 1000 / log_interval,
                    cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()
def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    ntokens = len(vocab)
    # Evaluate only for 50 batches to keep script execution time low.
    nbatches = min(50 * bptt, data_source.size(0) - 1)
    with torch.no_grad():
        for i in range(0, nbatches, bptt):
            data, targets = get_batch(data_source, i)
            output = eval_model(data).local_value()
            output_flat = output.view(-1, ntokens)
            # Need to move targets to the device where the output of the
            # pipeline resides.
            total_loss += len(data) * criterion(output_flat, targets.cuda(1)).item()
    return total_loss / (len(data_source) - 1) 

循环迭代。如果验证损失是迄今为止最好的,则保存模型。每个 epoch 后调整学习率。

best_val_loss = float("inf")
epochs = 3 # The number of epochs
best_model = None
for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train()
    val_loss = evaluate(model, val_data)
    print('-' * 89)
    print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
          'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                     val_loss, math.exp(val_loss)))
    print('-' * 89)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = model
    scheduler.step() 
/opt/conda/envs/py_3.10/lib/python3.10/site-packages/torch/optim/lr_scheduler.py:402: UserWarning:
To get the last learning rate computed by the scheduler, please use `get_last_lr()`.
| epoch   1 |    10/   50 batches | lr 5.00 | ms/batch 2955.60 | loss 51.97 | ppl 37278238304344674926592.00
| epoch   1 |    20/   50 batches | lr 5.00 | ms/batch 2626.09 | loss 39.16 | ppl 101468412802272112.00
| epoch   1 |    30/   50 batches | lr 5.00 | ms/batch 2627.16 | loss 45.74 | ppl 73373605537851539456.00
| epoch   1 |    40/   50 batches | lr 5.00 | ms/batch 2632.18 | loss 39.05 | ppl 90831844662671120.00
-----------------------------------------------------------------------------------------
| end of epoch   1 | time: 148.93s | valid loss  1.59 | valid ppl     4.92
-----------------------------------------------------------------------------------------
| epoch   2 |    10/   50 batches | lr 4.51 | ms/batch 2894.00 | loss 38.92 | ppl 79792098193225456.00
| epoch   2 |    20/   50 batches | lr 4.51 | ms/batch 2632.71 | loss 33.86 | ppl 508484255367480.44
| epoch   2 |    30/   50 batches | lr 4.51 | ms/batch 2630.00 | loss 29.47 | ppl 6267626426289.98
| epoch   2 |    40/   50 batches | lr 4.51 | ms/batch 2630.24 | loss 20.07 | ppl 521065165.54
-----------------------------------------------------------------------------------------
| end of epoch   2 | time: 148.40s | valid loss  0.54 | valid ppl     1.71
-----------------------------------------------------------------------------------------
| epoch   3 |    10/   50 batches | lr 4.29 | ms/batch 2891.16 | loss 13.75 | ppl 935925.21
| epoch   3 |    20/   50 batches | lr 4.29 | ms/batch 2629.50 | loss 10.74 | ppl 46322.74
| epoch   3 |    30/   50 batches | lr 4.29 | ms/batch 2629.95 | loss 10.97 | ppl 58152.80
| epoch   3 |    40/   50 batches | lr 4.29 | ms/batch 2629.52 | loss 11.29 | ppl 80130.60
-----------------------------------------------------------------------------------------
| end of epoch   3 | time: 148.36s | valid loss  0.24 | valid ppl     1.27
----------------------------------------------------------------------------------------- 

用测试数据集评估模型

应用最佳模型来检查与测试数据集的结果。

test_loss = evaluate(best_model, test_data)
print('=' * 89)
print('| End of training | test loss {:5.2f} | test ppl {:8.2f}'.format(
    test_loss, math.exp(test_loss)))
print('=' * 89) 
=========================================================================================
| End of training | test loss  0.21 | test ppl     1.23
========================================================================================= 

脚本的总运行时间:(8 分钟 5.064 秒)

下载 Python 源代码:pipeline_tutorial.py

下载 Jupyter 笔记本:pipeline_tutorial.ipynb

由 Sphinx-Gallery 生成的图库

使用 Distributed Data Parallel 和 Pipeline Parallelism 训练 Transformer 模型

原文:pytorch.org/tutorials/advanced/ddp_pipeline.html

译者:飞龙

协议:CC BY-NC-SA 4.0

注意

点击这里下载完整示例代码

作者Pritam Damania

本教程演示了如何使用Distributed Data ParallelPipeline Parallelism在多个 GPU 上训练大型 Transformer 模型。本教程是使用 nn.Transformer 和 TorchText 进行序列到序列建模教程的延伸,扩展了相同的模型以演示如何使用 Distributed Data Parallel 和 Pipeline Parallelism 来训练 Transformer 模型。

先决条件:

定义模型

PositionalEncoding 模块向序列中的令牌注入了一些关于相对或绝对位置的信息。位置编码与嵌入的维度相同,因此可以将两者相加。在这里,我们使用不同频率的 sinecosine 函数。

import sys
import os
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import tempfile
from torch.nn import TransformerEncoder, TransformerEncoderLayer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.pe = nn.Parameter(pe, requires_grad=False)
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x) 

在本教程中,我们将一个 Transformer 模型分割到两个 GPU 上,并使用管道并行来训练模型。除此之外,我们使用Distributed Data Parallel来训练这个管道的两个副本。我们有一个进程在 GPU 0 和 1 之间驱动一个管道,另一个进程在 GPU 2 和 3 之间驱动一个管道。然后,这两个进程使用 Distributed Data Parallel 来训练这两个副本。模型与使用 nn.Transformer 和 TorchText 进行序列到序列建模教程中使用的模型完全相同,但被分成了两个阶段。最多的参数属于nn.TransformerEncoder层。nn.TransformerEncoder本身由nlayersnn.TransformerEncoderLayer组成。因此,我们的重点是nn.TransformerEncoder,我们将模型分割成一半的nn.TransformerEncoderLayer在一个 GPU 上,另一半在另一个 GPU 上。为此,我们将EncoderDecoder部分提取到单独的模块中,然后构建一个代表原始 Transformer 模块的nn.Sequential

if sys.platform == 'win32':
    print('Windows platform is not supported for pipeline parallelism')
    sys.exit(0)
if torch.cuda.device_count() < 4:
    print('Need at least four GPU devices for this tutorial')
    sys.exit(0)
class Encoder(nn.Module):
    def __init__(self, ntoken, ninp, dropout=0.5):
        super(Encoder, self).__init__()
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.init_weights()
    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
    def forward(self, src):
        # Need (S, N) format for encoder.
        src = src.t()
        src = self.encoder(src) * math.sqrt(self.ninp)
        return self.pos_encoder(src)
class Decoder(nn.Module):
    def __init__(self, ntoken, ninp):
        super(Decoder, self).__init__()
        self.decoder = nn.Linear(ninp, ntoken)
        self.init_weights()
    def init_weights(self):
        initrange = 0.1
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)
    def forward(self, inp):
        # Need batch dimension first for output of pipeline.
        return self.decoder(inp).permute(1, 0, 2) 

启动多个进程进行训练

我们启动两个进程,每个进程在两个 GPU 上驱动自己的管道。对于每个进程,都会执行run_worker

def run_worker(rank, world_size): 

PyTorch 2.2 中文官方教程(十九)(3)https://developer.aliyun.com/article/1482623

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
相关文章
|
18天前
|
存储 物联网 PyTorch
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
**Torchtune**是由PyTorch团队开发的一个专门用于LLM微调的库。它旨在简化LLM的微调流程,提供了一系列高级API和预置的最佳实践
125 59
基于PyTorch的大语言模型微调指南:Torchtune完整教程与代码示例
|
3天前
|
并行计算 监控 搜索推荐
使用 PyTorch-BigGraph 构建和部署大规模图嵌入的完整教程
当处理大规模图数据时,复杂性难以避免。PyTorch-BigGraph (PBG) 是一款专为此设计的工具,能够高效处理数十亿节点和边的图数据。PBG通过多GPU或节点无缝扩展,利用高效的分区技术,生成准确的嵌入表示,适用于社交网络、推荐系统和知识图谱等领域。本文详细介绍PBG的设置、训练和优化方法,涵盖环境配置、数据准备、模型训练、性能优化和实际应用案例,帮助读者高效处理大规模图数据。
24 5
|
3月前
|
并行计算 Ubuntu PyTorch
Ubuntu下CUDA、Conda、Pytorch联合教程
本文是一份Ubuntu系统下安装和配置CUDA、Conda和Pytorch的教程,涵盖了查看显卡驱动、下载安装CUDA、添加环境变量、卸载CUDA、Anaconda的下载安装、环境管理以及Pytorch的安装和验证等步骤。
526 1
Ubuntu下CUDA、Conda、Pytorch联合教程
|
6月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.2 中文官方教程(十九)(1)
PyTorch 2.2 中文官方教程(十九)
133 1
PyTorch 2.2 中文官方教程(十九)(1)
|
6月前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十八)(4)
PyTorch 2.2 中文官方教程(十八)
102 1
|
6月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.2 中文官方教程(二十)(4)
PyTorch 2.2 中文官方教程(二十)
120 0
PyTorch 2.2 中文官方教程(二十)(4)
|
6月前
|
Android开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(2)
PyTorch 2.2 中文官方教程(二十)
104 0
PyTorch 2.2 中文官方教程(二十)(2)
|
6月前
|
iOS开发 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(二十)(1)
PyTorch 2.2 中文官方教程(二十)
106 0
PyTorch 2.2 中文官方教程(二十)(1)
|
6月前
|
PyTorch 算法框架/工具 异构计算
PyTorch 2.2 中文官方教程(十九)(3)
PyTorch 2.2 中文官方教程(十九)
58 0
PyTorch 2.2 中文官方教程(十九)(3)
|
6月前
|
PyTorch 算法框架/工具 并行计算
PyTorch 2.2 中文官方教程(二十)(3)
PyTorch 2.2 中文官方教程(二十)
166 0