❤️ 如果你也关注 AI 的发展现状,且对 AI 应用开发感兴趣,我会每日分享大模型与 AI 领域的开源项目和应用,提供运行实例和实用教程,帮助你快速上手AI技术!
🥦 AI 在线答疑 -> 智能检索历史文章和开源项目 -> 尽在微信公众号 -> 搜一搜:蚝油菜花 🥦
🎧 “训练效率翻倍!DeepSeek 开源双向流水线并行技术,大幅降低内存峰值需求”
大家好,我是蚝油菜花。你是否也遇到过——
- 👉 大规模模型训练时,计算资源利用率低,训练时间长
- 👉 内存峰值需求高,硬件资源受限,无法训练更大规模的模型
- 👉 分布式训练中通信开销大,流水线停滞现象严重...
今天揭秘的 DeepSeek DualPipe ,用三大颠覆性设计重构并行训练,彻底颠覆大规模深度学习模型的训练方式:
- ✅ 双向流水线:前向/反向传播双管道并行,消灭60%等待气泡
- ✅ 内存错峰调度:峰值内存直降47%,轻松训练更大模型
- ✅ 零通信冗余:梯度同步与计算完美重叠,吞吐量提升216%
通过将前向计算和反向计算解耦为两个独立的管道,并行执行,显著减少了流水线停滞现象,实现了计算与通信的重叠。工程师们已经用它加速模型训练,降低硬件资源需求——你的模型准备好迎接效率革命了吗?
🚀 快速阅读
DualPipe 是 DeepSeek 开源的双向流水线并行技术,主要用于提升大规模深度学习模型的训练效率。
- 核心功能:通过将前向计算和反向计算解耦为两个独立的管道,并行执行,显著减少流水线停滞现象。
- 技术原理:优化调度策略,实现计算与通信的完全重叠,降低内存峰值需求。
DualPipe 是什么
DualPipe 是 DeepSeek 开源的创新的双向流水线并行技术,主要用于提升大规模深度学习模型的训练效率。核心思想是将模型的训练过程分为两个独立的管道——前向计算管道和反向计算管道,并行执行。前向计算管道负责模型的前向传播,逐层处理输入数据生成预测结果。反向计算管道负责反向传播,计算预测结果与真实标签之间的误差,生成梯度用于参数更新。
DualPipe 通过优化通信机制和调度策略,进一步减少了分布式训练中的通信开销。这种设计不仅提高了计算资源的利用率,还显著降低了训练过程中的内存峰值需求,使得在有限的硬件资源下可以训练更大规模的模型。
DualPipe 的主要功能
- 大规模模型训练:DualPipe 技术通过将模型的前向传播和反向传播解耦为两个独立的管道,并行执行,显著减少了流水线停滞现象(即“气泡”),实现了计算与通信的重叠。在大规模分布式训练中,计算资源的利用率大幅提高,训练速度显著加快。
DualPipe 的技术原理
- 双向流水线设计:DualPipe 将模型的前向传播和反向传播分解为两个独立的管道,并行执行。前向管道负责模型的预测输出,反向管道负责计算梯度。通过这种解耦方式,DualPipe 实现了计算的并行化。
- 计算与通信重叠:DualPipe 通过优化调度,实现了前向和反向计算与通信的完全重叠,减少了流水线中的空闲时间(气泡),显著提高了资源利用率。
- 内存优化:由于前向和反向计算可以错峰执行,DualPipe 有效降低了训练过程中的内存峰值需求,在有限的硬件资源下可以训练更大规模的模型。
DualPipe 的管道气泡和内存使用比较
- 𝐹:表示前向块执行时间
- 𝐵:表示完整反向块执行时间
- 𝑊:表示“反向权重”块执行时间
- 𝐹&𝐵:表示两个相互重叠的前向和反向块执行时间。
如何运行 DualPipe
运行要求
- PyTorch 2.0 及以上
快速开始!
以下示例展示了DualPipe的使用方法:
from typing import List, Optional, Callable, Tuple
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
from dualpipe import DualPipe, set_p2p_tensor_shapes, set_p2p_tensor_dtype
from dualpipe.utils import WeightGradStore, run_backward
class LinearFunc(torch.autograd.Function):
@staticmethod
def forward(ctx, input, weight):
ctx.save_for_backward(input, weight)
output = F.linear(input, weight)
return output
@staticmethod
def backward(ctx, grad_output):
input, weight = ctx.saved_tensors
if weight.grad is None:
weight.grad = torch.zeros_like(weight)
def grad_weight_fn():
weight.grad += grad_output.flatten(0, -2).T @ input.flatten(0, -2)
if WeightGradStore.enabled:
WeightGradStore.put(grad_weight_fn)
else:
grad_weight_fn()
grad_input = grad_output @ weight
return grad_input, None
class MyLinear(nn.Linear):
def forward(self, input: torch.Tensor) -> torch.Tensor:
return LinearFunc.apply(input, self.weight)
class PipelineStage(nn.Module):
def __init__(self, hidden_size: int) -> None:
super().__init__()
self.linear1 = MyLinear(hidden_size, hidden_size * 4, bias=False)
self.linear2 = MyLinear(hidden_size * 4, hidden_size, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.linear1(x)
x = F.gelu(x)
x = self.linear2(x)
return x
@classmethod
def overlaped_forward_backward(
cls,
module0: "PipelineStage",
inputs0: List[torch.Tensor],
criterion0: Optional[Callable],
labels0: Optional[List[torch.Tensor]],
module1: "PipelineStage",
loss1: Optional[torch.Tensor],
outputs1: Optional[List[torch.Tensor]],
output_grads1: Optional[List[torch.Tensor]],
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
You should implement custom forward-backward overlap strategy.
The code below is just an example.
"""
outputs0 = module0(*inputs0)
outputs0 = [outputs0] if isinstance(outputs0, torch.Tensor) else outputs0
if criterion0 is not None:
loss0 = criterion0(*outputs0, *labels0)
else:
loss0 = None
if loss1 is not None:
loss1.backward()
loss1.detach_()
else:
run_backward(outputs1, output_grads1)
return outputs0, loss0
def criterion(output: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
return F.mse_loss(output, target).clone()
def ref_step(x, l, model, chunks):
ys, losses = [], []
for micro_x, micro_l in zip(x.chunk(chunks), l.chunk(chunks)):
micro_y = model(micro_x)
loss = criterion(micro_y, micro_l)
loss.backward()
ys.append(micro_y)
losses.append(loss)
y = torch.cat(ys, 0)
loss = torch.stack(losses)
return loss, y
def cal_diff(x: torch.Tensor, y: torch.Tensor) -> float:
x, y = x.double(), y.double()
cos_diff = 1 - 2 * (x * y).sum().item() / (x * x + y * y).sum().item()
return cos_diff
def main(rank, pp_size):
is_first_rank = rank == 0
is_last_rank = rank == pp_size - 1
dist.init_process_group(backend='nccl', init_method="env://", world_size=pp_size, rank=rank)
torch.cuda.set_device(rank)
torch.set_default_device(f"cuda:{rank}")
torch.manual_seed(233)
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
num_chunks = 20
micro_batch_size = 3
seq_len = 256
hidden_size = 512
if is_first_rank:
print(f"{pp_size=}, {num_chunks=}, {seq_len=}, {hidden_size=}", flush=True)
set_p2p_tensor_shapes([(micro_batch_size, seq_len, hidden_size)])
set_p2p_tensor_dtype(torch.float32)
# Create a model and partition it for each process
full_modules = nn.Sequential(*[PipelineStage(hidden_size) for _ in range(pp_size)])
# Full inputs
full_x = torch.randn(num_chunks * micro_batch_size, seq_len, hidden_size)
full_l = torch.randn(num_chunks * micro_batch_size, seq_len, hidden_size)
# Reference step
loss_ref, output_ref = ref_step(full_x, full_l, full_modules, num_chunks)
# DualPipe
local_full_modules = nn.Sequential(full_modules[rank], full_modules[pp_size - 1 - rank])
local_modules = nn.Sequential(PipelineStage(hidden_size), PipelineStage(hidden_size))
local_modules[0].load_state_dict(local_full_modules[0].state_dict())
local_modules[1].load_state_dict(local_full_modules[1].state_dict())
dualpipe_model = DualPipe(local_modules)
# DualPipe inputs
if is_first_rank:
x = full_x.chunk(2)[0]
l = full_l.chunk(2)[1]
elif is_last_rank:
x = full_x.chunk(2)[1]
l = full_l.chunk(2)[0]
else:
x = None
l = None
# Training step
loss, outputs = dualpipe_model.step(x, num_chunks=num_chunks, criterion=criterion, labels=(l,), return_outputs=False)
# Check loss
if is_first_rank:
assert torch.equal(loss, loss_ref.chunk(2)[1])
elif is_last_rank:
assert torch.equal(loss, loss_ref.chunk(2)[0])
else:
assert loss is None
assert outputs is None
# Check grads
for (p0, p1) in zip(local_modules[0].parameters(), local_modules[1].parameters()):
p0all = torch.empty(pp_size, *p0.shape)
p1all = torch.empty(pp_size, *p1.shape)
dist.all_gather_into_tensor(p0all, p0.grad)
dist.all_gather_into_tensor(p1all, p1.grad)
p0.grad += p1all[pp_size - 1 - rank]
p1.grad += p0all[pp_size - 1 - rank]
for ((n, p), p_ref) in zip(local_modules.named_parameters(), local_full_modules.parameters()):
assert cal_diff(p.grad, p_ref.grad) < 1e-13
dualpipe_model.zero_grad()
# Inference step
with torch.no_grad():
loss, outputs = dualpipe_model.step(x, num_chunks=num_chunks, criterion=criterion, labels=(l,), return_outputs=True)
# Check loss and outputs
if is_first_rank:
assert torch.equal(loss, loss_ref.chunk(2)[1])
assert torch.equal(outputs, output_ref.chunk(2)[1])
elif is_last_rank:
assert torch.equal(loss, loss_ref.chunk(2)[0])
assert torch.equal(outputs, output_ref.chunk(2)[0])
else:
assert loss is None
assert outputs is None
def test_dualpipe(ngpus):
torch.multiprocessing.spawn(main, args=(ngpus, ), nprocs=ngpus, daemon=True)
if __name__ == "__main__":
num_gpus = torch.cuda.device_count() // 2 * 2
for ngpus in range(num_gpus, 0, -2):
test_dualpipe(ngpus)
注意:对于实际应用,您需要实现一个定制的 overlapped_forward_backward
方法,以适应您特定的模块。
资源
- GitHub 仓库:https://github.com/deepseek-ai/DualPipe
❤️ 如果你也关注 AI 的发展现状,且对 AI 应用开发感兴趣,我会每日分享大模型与 AI 领域的开源项目和应用,提供运行实例和实用教程,帮助你快速上手AI技术!
🥦 AI 在线答疑 -> 智能检索历史文章和开源项目 -> 尽在微信公众号 -> 搜一搜:蚝油菜花 🥦