DeepSeek开源周第四弹之一!DualPipe:训练V3/R1的双向流水线并行技术,计算与训练完全重叠,训练效率提升200%

本文涉及的产品
模型训练 PAI-DLC,100CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
简介: DeepSeek 开源的 DualPipe 技术通过双向流水线并行设计,显著提升大规模深度学习模型的训练效率,优化计算与通信重叠,降低内存峰值需求,适用于推理加速、多模态数据处理等场景。

❤️ 如果你也关注 AI 的发展现状,且对 AI 应用开发感兴趣,我会每日分享大模型与 AI 领域的开源项目和应用,提供运行实例和实用教程,帮助你快速上手AI技术!

🥦 AI 在线答疑 -> 智能检索历史文章和开源项目 -> 尽在微信公众号 -> 搜一搜:蚝油菜花 🥦


🎧 “训练效率翻倍!DeepSeek 开源双向流水线并行技术,大幅降低内存峰值需求”

大家好,我是蚝油菜花。你是否也遇到过——

  • 👉 大规模模型训练时,计算资源利用率低,训练时间长
  • 👉 内存峰值需求高,硬件资源受限,无法训练更大规模的模型
  • 👉 分布式训练中通信开销大,流水线停滞现象严重...

今天揭秘的 DeepSeek DualPipe ,用三大颠覆性设计重构并行训练,彻底颠覆大规模深度学习模型的训练方式:

  • ✅ 双向流水线:前向/反向传播双管道并行,消灭60%等待气泡
  • ✅ 内存错峰调度:峰值内存直降47%,轻松训练更大模型
  • ✅ 零通信冗余:梯度同步与计算完美重叠,吞吐量提升216%

通过将前向计算和反向计算解耦为两个独立的管道,并行执行,显著减少了流水线停滞现象,实现了计算与通信的重叠。工程师们已经用它加速模型训练,降低硬件资源需求——你的模型准备好迎接效率革命了吗?

🚀 快速阅读

DualPipe 是 DeepSeek 开源的双向流水线并行技术,主要用于提升大规模深度学习模型的训练效率。

  1. 核心功能:通过将前向计算和反向计算解耦为两个独立的管道,并行执行,显著减少流水线停滞现象。
  2. 技术原理:优化调度策略,实现计算与通信的完全重叠,降低内存峰值需求。

DualPipe 是什么

DualPipe-cover

DualPipe 是 DeepSeek 开源的创新的双向流水线并行技术,主要用于提升大规模深度学习模型的训练效率。核心思想是将模型的训练过程分为两个独立的管道——前向计算管道和反向计算管道,并行执行。前向计算管道负责模型的前向传播,逐层处理输入数据生成预测结果。反向计算管道负责反向传播,计算预测结果与真实标签之间的误差,生成梯度用于参数更新。

DualPipe 通过优化通信机制和调度策略,进一步减少了分布式训练中的通信开销。这种设计不仅提高了计算资源的利用率,还显著降低了训练过程中的内存峰值需求,使得在有限的硬件资源下可以训练更大规模的模型。

DualPipe 的主要功能

  • 大规模模型训练:DualPipe 技术通过将模型的前向传播和反向传播解耦为两个独立的管道,并行执行,显著减少了流水线停滞现象(即“气泡”),实现了计算与通信的重叠。在大规模分布式训练中,计算资源的利用率大幅提高,训练速度显著加快。

DualPipe 的技术原理

DualPipe-schedules

  • 双向流水线设计:DualPipe 将模型的前向传播和反向传播分解为两个独立的管道,并行执行。前向管道负责模型的预测输出,反向管道负责计算梯度。通过这种解耦方式,DualPipe 实现了计算的并行化。
  • 计算与通信重叠:DualPipe 通过优化调度,实现了前向和反向计算与通信的完全重叠,减少了流水线中的空闲时间(气泡),显著提高了资源利用率。
  • 内存优化:由于前向和反向计算可以错峰执行,DualPipe 有效降低了训练过程中的内存峰值需求,在有限的硬件资源下可以训练更大规模的模型。

DualPipe 的管道气泡和内存使用比较

DualPipe-Comparison

  • 𝐹:表示前向块执行时间
  • 𝐵:表示完整反向块执行时间
  • 𝑊:表示“反向权重”块执行时间
  • 𝐹&𝐵:表示两个相互重叠的前向和反向块执行时间。

如何运行 DualPipe

运行要求

  • PyTorch 2.0 及以上

快速开始!

以下示例展示了DualPipe的使用方法:

from typing import List, Optional, Callable, Tuple
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist

from dualpipe import DualPipe, set_p2p_tensor_shapes, set_p2p_tensor_dtype
from dualpipe.utils import WeightGradStore, run_backward


class LinearFunc(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input, weight):
        ctx.save_for_backward(input, weight)
        output = F.linear(input, weight)
        return output

    @staticmethod
    def backward(ctx, grad_output):
        input, weight = ctx.saved_tensors
        if weight.grad is None:
            weight.grad = torch.zeros_like(weight)

        def grad_weight_fn():
            weight.grad += grad_output.flatten(0, -2).T @ input.flatten(0, -2)

        if WeightGradStore.enabled:
            WeightGradStore.put(grad_weight_fn)
        else:
            grad_weight_fn()
        grad_input = grad_output @ weight
        return grad_input, None


class MyLinear(nn.Linear):
    def forward(self, input: torch.Tensor) -> torch.Tensor:
        return LinearFunc.apply(input, self.weight)


class PipelineStage(nn.Module):
    def __init__(self, hidden_size: int) -> None:
        super().__init__()
        self.linear1 = MyLinear(hidden_size, hidden_size * 4, bias=False)
        self.linear2 = MyLinear(hidden_size * 4, hidden_size, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.linear1(x)
        x = F.gelu(x)
        x = self.linear2(x)
        return x

    @classmethod
    def overlaped_forward_backward(
        cls,
        module0: "PipelineStage",
        inputs0: List[torch.Tensor],
        criterion0: Optional[Callable],
        labels0: Optional[List[torch.Tensor]],
        module1: "PipelineStage",
        loss1: Optional[torch.Tensor],
        outputs1: Optional[List[torch.Tensor]],
        output_grads1: Optional[List[torch.Tensor]],
    ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        """
        You should implement custom forward-backward overlap strategy.
        The code below is just an example.
        """
        outputs0 = module0(*inputs0)
        outputs0 = [outputs0] if isinstance(outputs0, torch.Tensor) else outputs0
        if criterion0 is not None:
            loss0 = criterion0(*outputs0, *labels0)
        else:
            loss0 = None

        if loss1 is not None:
            loss1.backward()
            loss1.detach_()
        else:
            run_backward(outputs1, output_grads1)

        return outputs0, loss0


def criterion(output: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
    return F.mse_loss(output, target).clone()


def ref_step(x, l, model, chunks):
    ys, losses = [], []
    for micro_x, micro_l in zip(x.chunk(chunks), l.chunk(chunks)):
        micro_y = model(micro_x)
        loss = criterion(micro_y, micro_l)
        loss.backward()
        ys.append(micro_y)
        losses.append(loss)
    y = torch.cat(ys, 0)
    loss = torch.stack(losses)
    return loss, y


def cal_diff(x: torch.Tensor, y: torch.Tensor) -> float:
    x, y = x.double(), y.double()
    cos_diff = 1 - 2 * (x * y).sum().item() / (x * x + y * y).sum().item()
    return cos_diff


def main(rank, pp_size):
    is_first_rank = rank == 0
    is_last_rank = rank == pp_size - 1
    dist.init_process_group(backend='nccl', init_method="env://", world_size=pp_size, rank=rank)
    torch.cuda.set_device(rank)
    torch.set_default_device(f"cuda:{rank}")
    torch.manual_seed(233)
    os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"

    num_chunks = 20
    micro_batch_size = 3
    seq_len = 256
    hidden_size = 512
    if is_first_rank:
        print(f"{pp_size=}, {num_chunks=}, {seq_len=}, {hidden_size=}", flush=True)
    set_p2p_tensor_shapes([(micro_batch_size, seq_len, hidden_size)])
    set_p2p_tensor_dtype(torch.float32)

    # Create a model and partition it for each process
    full_modules = nn.Sequential(*[PipelineStage(hidden_size) for _ in range(pp_size)])

    # Full inputs
    full_x = torch.randn(num_chunks * micro_batch_size, seq_len, hidden_size)
    full_l = torch.randn(num_chunks * micro_batch_size, seq_len, hidden_size)

    # Reference step
    loss_ref, output_ref = ref_step(full_x, full_l, full_modules, num_chunks)

    # DualPipe
    local_full_modules = nn.Sequential(full_modules[rank], full_modules[pp_size - 1 - rank])
    local_modules = nn.Sequential(PipelineStage(hidden_size), PipelineStage(hidden_size))
    local_modules[0].load_state_dict(local_full_modules[0].state_dict())
    local_modules[1].load_state_dict(local_full_modules[1].state_dict())
    dualpipe_model = DualPipe(local_modules)

    # DualPipe inputs
    if is_first_rank:
        x = full_x.chunk(2)[0]
        l = full_l.chunk(2)[1]
    elif is_last_rank:
        x = full_x.chunk(2)[1]
        l = full_l.chunk(2)[0]
    else:
        x = None
        l = None

    # Training step
    loss, outputs = dualpipe_model.step(x, num_chunks=num_chunks, criterion=criterion, labels=(l,), return_outputs=False)

    # Check loss
    if is_first_rank:
        assert torch.equal(loss, loss_ref.chunk(2)[1])
    elif is_last_rank:
        assert torch.equal(loss, loss_ref.chunk(2)[0])
    else:
        assert loss is None
    assert outputs is None

    # Check grads
    for (p0, p1) in zip(local_modules[0].parameters(), local_modules[1].parameters()):
        p0all = torch.empty(pp_size, *p0.shape)
        p1all = torch.empty(pp_size, *p1.shape)
        dist.all_gather_into_tensor(p0all, p0.grad)
        dist.all_gather_into_tensor(p1all, p1.grad)
        p0.grad += p1all[pp_size - 1 - rank]
        p1.grad += p0all[pp_size - 1 - rank]
    for ((n, p), p_ref) in zip(local_modules.named_parameters(), local_full_modules.parameters()):
        assert cal_diff(p.grad, p_ref.grad) < 1e-13
    dualpipe_model.zero_grad()

    # Inference step
    with torch.no_grad():
        loss, outputs = dualpipe_model.step(x, num_chunks=num_chunks, criterion=criterion, labels=(l,), return_outputs=True)

    # Check loss and outputs
    if is_first_rank:
        assert torch.equal(loss, loss_ref.chunk(2)[1])
        assert torch.equal(outputs, output_ref.chunk(2)[1])
    elif is_last_rank:
        assert torch.equal(loss, loss_ref.chunk(2)[0])
        assert torch.equal(outputs, output_ref.chunk(2)[0])
    else:
        assert loss is None
        assert outputs is None


def test_dualpipe(ngpus):
    torch.multiprocessing.spawn(main, args=(ngpus, ), nprocs=ngpus, daemon=True)


if __name__ == "__main__":
    num_gpus = torch.cuda.device_count() // 2 * 2
    for ngpus in range(num_gpus, 0, -2):
        test_dualpipe(ngpus)

注意:对于实际应用,您需要实现一个定制的 overlapped_forward_backward 方法,以适应您特定的模块。

资源


❤️ 如果你也关注 AI 的发展现状,且对 AI 应用开发感兴趣,我会每日分享大模型与 AI 领域的开源项目和应用,提供运行实例和实用教程,帮助你快速上手AI技术!

🥦 AI 在线答疑 -> 智能检索历史文章和开源项目 -> 尽在微信公众号 -> 搜一搜:蚝油菜花 🥦

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
8月前
|
存储 人工智能 算法
就AI 基础设施的演进与挑战问题之流水线并行工作的问题如何解决
就AI 基础设施的演进与挑战问题之流水线并行工作的问题如何解决
|
9月前
|
敏捷开发 安全 测试技术
阿里云云效产品使用合集之流水线的并行可以如何实现
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
11月前
|
存储 缓存 Windows
软件体系结构 - 流水线技术
软件体系结构 - 流水线技术
141 0
|
机器学习/深度学习 人工智能 自然语言处理
AI顶会ICLR 2022 | WPipe 蚂蚁集团大规模 DNN 训练的流水线并行技术
AI顶会ICLR 2022 | WPipe 蚂蚁集团大规模 DNN 训练的流水线并行技术
897 0
AI顶会ICLR 2022 | WPipe 蚂蚁集团大规模 DNN 训练的流水线并行技术
|
关系型数据库 分布式数据库 数据库
polardb里面的wal流水线技术的优势是什么
polardb里面的wal流水线技术的优势是什么
97 1
|
监控 jenkins Java
持续集成/技术交付全流程流水线工具的设计与落地
持续集成/技术交付全流程流水线工具的设计与落地
221 0
|
前端开发 算法 测试技术
【软考学习5】流水线基本概念、周期执行时间、吞吐率、加速比和效率的计算
【软考学习5】流水线基本概念、周期执行时间、吞吐率、加速比和效率的计算
1050 0
|
SQL 机器学习/深度学习 数据挖掘
【翻译】开源机器学习流水线工具调研(MLOps)(下)
实施数据科学项目不是一件简单的任务。至少,数据分析工作流程必须定期运行,以产生最新的结果。比如,一份上周数据的报告,或者由于概念发生变化而重新训练机器学习模型。在某些情况下,这类工作流的输出需要作为API公开,例如,一个经过训练的机器学习模型,通过点击REST端点来生成预测结果。 这就需要开发实践允许工作流(也称为pipeline)是可重现、可重复,并且可以很容易地部署。近年来,涌现了大量开源工作流管理工具。由于有太多的选择,团队很难选择最适合他们需求的工具,本文回顾了13种开源工作流管理工具。
|
SQL 机器学习/深度学习 Kubernetes
【翻译】开源机器学习流水线工具调研(MLOps)(中)
实施数据科学项目不是一件简单的任务。至少,数据分析工作流程必须定期运行,以产生最新的结果。比如,一份上周数据的报告,或者由于概念发生变化而重新训练机器学习模型。在某些情况下,这类工作流的输出需要作为API公开,例如,一个经过训练的机器学习模型,通过点击REST端点来生成预测结果。 这就需要开发实践允许工作流(也称为pipeline)是可重现、可重复,并且可以很容易地部署。近年来,涌现了大量开源工作流管理工具。由于有太多的选择,团队很难选择最适合他们需求的工具,本文回顾了13种开源工作流管理工具。
|
机器学习/深度学习 SQL Kubernetes
【翻译】开源机器学习流水线工具调研(MLOps)(上)
实施数据科学项目不是一件简单的任务。至少,数据分析工作流程必须定期运行,以产生最新的结果。比如,一份上周数据的报告,或者由于概念发生变化而重新训练机器学习模型。在某些情况下,这类工作流的输出需要作为API公开,例如,一个经过训练的机器学习模型,通过点击REST端点来生成预测结果。 这就需要开发实践允许工作流(也称为pipeline)是可重现、可重复,并且可以很容易地部署。近年来,涌现了大量开源工作流管理工具。由于有太多的选择,团队很难选择最适合他们需求的工具,本文回顾了13种开源工作流管理工具。

热门文章

最新文章

下一篇
oss创建bucket