PyTorch 2.2 中文官方教程(十八)(1)https://developer.aliyun.com/article/1482603
混合精度
FSDP 支持灵活的混合精度训练,允许使用任意降低精度类型(如 fp16 或 bfloat16)。目前,BFloat16 仅在安培 GPU 上可用,因此在使用之前需要确认是否有本机支持。例如,在 V100 上,仍然可以运行 BFloat16,但由于它是非本机运行,可能会导致显著的减速。
要检查是否原生支持 BFloat16,您可以使用以下方法:
bf16_ready = ( torch.version.cuda and torch.cuda.is_bf16_supported() and LooseVersion(torch.version.cuda) >= "11.0" and dist.is_nccl_available() and nccl.version() >= (2, 10) )
在 FSDP 中混合精度的一个优点是为参数、梯度和缓冲区提供不同精度级别的细粒度控制。
fpSixteen = MixedPrecision( param_dtype=torch.float16, # Gradient communication precision. reduce_dtype=torch.float16, # Buffer precision. buffer_dtype=torch.float16, ) bfSixteen = MixedPrecision( param_dtype=torch.bfloat16, # Gradient communication precision. reduce_dtype=torch.bfloat16, # Buffer precision. buffer_dtype=torch.bfloat16, ) fp32_policy = MixedPrecision( param_dtype=torch.float32, # Gradient communication precision. reduce_dtype=torch.float32, # Buffer precision. buffer_dtype=torch.float32, )
请注意,如果某种类型(参数、减少、缓冲区)未指定,则它们将不会被转换。
这种灵活性使用户可以进行精细的控制,比如只将梯度通信设置为以降低精度进行,而所有参数/缓冲计算则以全精度进行。在节点内通信是主要瓶颈且参数/缓冲必须以全精度进行以避免精度问题的情况下,这种方法可能非常有用。可以使用以下策略来实现:
grad_bf16 = MixedPrecision(reduce_dtype=torch.bfloat16)
在 2.4 版本中,我们只需将相关的混合精度策略添加到 FSDP 包装器中:
model = FSDP(model, auto_wrap_policy=t5_auto_wrap_policy, mixed_precision=bfSixteen)
在我们的实验中,我们观察到使用 BFloat16 进行训练可以加快速度达到 4 倍,并且在一些实验中可以减少大约 30%的内存,这可以用于增加批量大小。
在设备上初始化 FSDP 模型
在 1.12 版本中,FSDP 支持一个 device_id 参数,旨在初始化设备上的输入 CPU 模块。当整个模型无法适应单个 GPU,但适应主机的 CPU 内存时,这将非常有用。当指定 device_id 时,FSDP 将根据每个 FSDP 单元将模型移动到指定的设备上,避免 GPU 内存不足问题,同时初始化速度比基于 CPU 的初始化快数倍。
torch.cuda.set_device(local_rank) model = FSDP(model, auto_wrap_policy=t5_auto_wrap_policy, mixed_precision=bfSixteen, device_id=torch.cuda.current_device())
分片策略
默认情况下,FSDP 分片策略被设置为完全分片模型参数,梯度和优化器状态在所有等级之间分片(也称为 Zero3 分片)。如果您希望使用 Zero2 分片策略,仅对优化器状态和梯度进行分片,FSDP 支持通过将分片策略传递给 FSDP 初始化来实现此功能,如下所示:“ShardingStrategy.SHARD_GRAD_OP”,而不是“ShardingStrategy.FULL_SHARD”。
torch.cuda.set_device(local_rank) model = FSDP(model, auto_wrap_policy=t5_auto_wrap_policy, mixed_precision=bfSixteen, device_id=torch.cuda.current_device(), sharding_strategy=ShardingStrategy.SHARD_GRAD_OP # ZERO2)
这将减少 FSDP 中的通信开销,在这种情况下,在前向传播和反向传播后保持完整的参数。
在反向传播过程中,这样做可以节省一次全局聚合操作,从而减少通信量,但会增加内存占用。请注意,完整的模型参数会在反向传播结束时被释放,全局聚合操作将在下一次前向传播中进行。
向后预取
后向预取设置控制了何时应请求下一个 FSDP 单元的参数。通过将其设置为 BACKWARD_PRE,下一个 FSDP 单元的参数可以在当前单元的计算开始之前开始请求并到达。这会重叠所有收集通信和梯度计算,可以增加训练速度,但会略微增加内存消耗。可以在 2.4 版本中的 FSDP 包装器中利用它。
torch.cuda.set_device(local_rank) model = FSDP(model, auto_wrap_policy=t5_auto_wrap_policy, mixed_precision=bfSixteen, device_id=torch.cuda.current_device(), backward_prefetch = BackwardPrefetch.BACKWARD_PRE)
backward_prefetch 有两种模式,BACKWARD_PRE 和 BACKWARD_POST。BACKWARD_POST 意味着直到当前 FSDP 单元处理完成之前,不会请求下一个 FSDP 单元的参数,从而最大限度地减少内存开销。在某些情况下,使用 BACKWARD_PRE 可以将模型训练速度提高 2-10%,对于更大的模型,速度提高更为显著。
模型检查点保存,通过流式传输到 Rank0 CPU。
使用 FULL_STATE_DICT 保存模型检查点,该保存方式与本地模型相同,PyTorch 1.12 提供了一些实用工具来支持保存更大的模型。
首先,可以指定一个 FullStateDictConfig,允许仅在 rank 0 上填充 state_dict 并转移到 CPU。
在使用这种配置时,FSDP 将会收集模型参数,逐个将其转移到 CPU 上,仅在 rank 0 上进行。当 state_dict 最终保存时,它只会在 rank 0 上填充,并包含 CPU 张量。这避免了对于大于单个 GPU 内存的模型可能出现的 OOM,并允许用户对模型进行检查点,其大小大致等于用户机器上可用的 CPU RAM。
这个功能可以按照以下方式运行:
save_policy = FullStateDictConfig(offload_to_cpu=True, rank0_only=True) with FSDP.state_dict_type( model, StateDictType.FULL_STATE_DICT, save_policy ): cpu_state = model.state_dict() if rank == 0: save_name = file_save_name + "-" + time_of_run + "-" + currEpoch torch.save(cpu_state, save_name)
摘要
在本教程中,我们介绍了 Pytorch 1.12 中可用的许多 FSDP 的新功能,并以 HF T5 作为运行示例。特别是对于变压器模型,使用适当的包装策略,以及混合精度和向后预取应该可以加快您的训练速度。此外,诸如在设备上初始化模型和通过流式传输到 CPU 保存检查点等功能应该有助于避免处理大型模型时的 OOM 错误。
我们正在积极努力为下一个版本的 FSDP 添加新功能。如果您有反馈、功能请求、问题或在使用 FSDP 时遇到问题,请随时通过在PyTorch Github 存储库中打开问题与我们联系。
使用 Cpp 扩展自定义流程组后端
原文:
pytorch.org/tutorials/intermediate/process_group_cpp_extension_tutorial.html
译者:飞龙
作者:Howard Huang https://github.com/H-Huang,Feng Tian,Shen Li,Min Si
注意
在 github 上查看并编辑本教程。
先决条件:
- PyTorch 分布式概述
- PyTorch 集体通信包
- PyTorch Cpp 扩展
- 使用 PyTorch 编写分布式应用程序
本教程演示了如何实现一个自定义的Backend
并将其插入PyTorch 分布式包,使用cpp 扩展。当您需要为硬件定制专门的软件堆栈,或者想要尝试新的集体通信算法时,这将非常有帮助。
基础知识
PyTorch 集体通信支持多种广泛采用的分布式训练功能,包括DistributedDataParallel,ZeroRedundancyOptimizer,FullyShardedDataParallel。为了使相同的集体通信 API 能够与不同的通信后端一起工作,分布式包将集体通信操作抽象为Backend类。不同的后端可以作为Backend
的子类使用首选的第三方库来实现。PyTorch 分布式带有三个默认后端,ProcessGroupNCCL
,ProcessGroupGloo
和ProcessGroupMPI
。然而,除了这三个后端之外,还有其他通信库(例如UCC,OneCCL),不同类型的硬件(例如TPU,Trainum)和新兴的通信算法(例如Herring,Reduction Server)。因此,分布式包提供了扩展 API 来允许定制集体通信后端。
以下 4 个步骤展示了如何在 Python 应用程序代码中实现一个虚拟的Backend
后端并使用它。请注意,本教程侧重于演示扩展 API,而不是开发一个功能完善的通信后端。因此,dummy
后端只涵盖了 API 的一个子集(all_reduce
和all_gather
),并且只是将张量的值设置为 0。
步骤 1:实现Backend
的子类
第一步是实现一个Backend
子类,覆盖目标集体通信 API,并运行自定义通信算法。扩展还需要实现一个Work
子类,作为通信结果的 future,并允许在应用代码中异步执行。如果扩展使用第三方库,可以在BackendDummy
子类中包含头文件并调用库 API。下面的两个代码片段展示了dummy.h
和dummy.cpp
的实现。请查看dummy collectives存储库以获取完整的实现。
// file name: dummy.hpp #include <torch/python.h> #include <torch/csrc/distributed/c10d/Backend.hpp> #include <torch/csrc/distributed/c10d/Work.hpp> #include <torch/csrc/distributed/c10d/Store.hpp> #include <torch/csrc/distributed/c10d/Types.hpp> #include <torch/csrc/distributed/c10d/Utils.hpp> #include <pybind11/chrono.h> namespace c10d { class BackendDummy : public Backend { public: BackendDummy(int rank, int size); c10::intrusive_ptr<Work> allgather( std::vector<std::vector<at::Tensor>>& outputTensors, std::vector<at::Tensor>& inputTensors, const AllgatherOptions& opts = AllgatherOptions()) override; c10::intrusive_ptr<Work> allreduce( std::vector<at::Tensor>& tensors, const AllreduceOptions& opts = AllreduceOptions()) override; // The collective communication APIs without a custom implementation // will error out if invoked by application code. }; class WorkDummy : public Work { public: WorkDummy( OpType opType, c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output : Work( -1, // rank, only used by recvAnySource, irrelevant in this demo opType), future_(std::move(future)) {} bool isCompleted() override; bool isSuccess() const override; bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override; virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override; private: c10::intrusive_ptr<c10::ivalue::Future> future_; }; } // namespace c10d
// file name: dummy.cpp #include "dummy.hpp" namespace c10d { // This is a dummy allgather that sets all output tensors to zero // Modify the implementation to conduct real communication asynchronously c10::intrusive_ptr<Work> BackendDummy::allgather( std::vector<std::vector<at::Tensor>>& outputTensors, std::vector<at::Tensor>& inputTensors, const AllgatherOptions& /* unused */) { for (auto& outputTensorVec : outputTensors) { for (auto& outputTensor : outputTensorVec) { outputTensor.zero_(); } } auto future = c10::make_intrusive<c10::ivalue::Future>( c10::ListType::create(c10::ListType::create(c10::TensorType::get()))); future->markCompleted(c10::IValue(outputTensors)); return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future)); } // This is a dummy allreduce that sets all output tensors to zero // Modify the implementation to conduct real communication asynchronously c10::intrusive_ptr<Work> BackendDummy::allreduce( std::vector<at::Tensor>& tensors, const AllreduceOptions& opts) { for (auto& tensor : tensors) { tensor.zero_(); } auto future = c10::make_intrusive<c10::ivalue::Future>( c10::ListType::create(c10::TensorType::get())); future->markCompleted(c10::IValue(tensors)); return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future)); } } // namespace c10d
步骤 2:暴露扩展 Python API
后端构造函数是从 Python 端调用的,因此扩展还需要向 Python 公开构造函数 API。这可以通过添加以下方法来实现。在这个例子中,store
和timeout
被BackendDummy
实例化方法忽略,因为在这个虚拟实现中没有使用它们。然而,真实世界的扩展应该考虑使用store
来执行会合并支持timeout
参数。
// file name: dummy.hpp class BackendDummy : public Backend { ... <Step 1 code> ... static c10::intrusive_ptr<Backend> createBackendDummy( const c10::intrusive_ptr<::c10d::Store>& store, int rank, int size, const std::chrono::duration<float>& timeout); static void BackendDummyConstructor() __attribute__((constructor)) { py::object module = py::module::import("torch.distributed"); py::object register_backend = module.attr("Backend").attr("register_backend"); // torch.distributed.Backend.register_backend will add `dummy` as a // new valid backend. register_backend("dummy", py::cpp_function(createBackendDummy)); } }
// file name: dummy.cpp c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy( const c10::intrusive_ptr<::c10d::Store>& /* unused */, int rank, int size, const std::chrono::duration<float>& /* unused */) { return c10::make_intrusive<BackendDummy>(rank, size); } PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { m.def("createBackendDummy", &BackendDummy::createBackendDummy); }
步骤 3:构建自定义扩展
现在,扩展源代码文件已经准备好。我们可以使用cpp extensions来构建它。为此,创建一个setup.py
文件,准备路径和命令。然后调用python setup.py develop
来安装扩展。
如果扩展依赖于第三方库,您还可以在 cpp 扩展 API 中指定libraries_dirs
和libraries
。请参考torch ucc项目作为一个真实的例子。
# file name: setup.py import os import sys import torch from setuptools import setup from torch.utils import cpp_extension sources = ["src/dummy.cpp"] include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"] if torch.cuda.is_available(): module = cpp_extension.CUDAExtension( name = "dummy_collectives", sources = sources, include_dirs = include_dirs, ) else: module = cpp_extension.CppExtension( name = "dummy_collectives", sources = sources, include_dirs = include_dirs, ) setup( name = "Dummy-Collectives", version = "0.0.1", ext_modules = [module], cmdclass={'build_ext': cpp_extension.BuildExtension} )
步骤 4:在应用程序中使用扩展。
安装完成后,您可以在调用init_process_group时方便地使用dummy
后端,就像它是一个内置后端一样。
我们可以根据后端来指定调度,方法是改变init_process_group
的backend
参数。我们可以通过将后端参数指定为cpu:gloo,cuda:dummy
,将 CPU 张量的集体分发到gloo
后端,将 CUDA 张量的集体分发到dummy
后端。
要将所有张量发送到dummy
后端,我们可以简单地将dummy
指定为后端参数。
import os import torch # importing dummy_collectives makes torch.distributed recognize `dummy` # as a valid backend. import dummy_collectives import torch.distributed as dist os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '29500' # Alternatively: # dist.init_process_group("dummy", rank=0, world_size=1) dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1) # this goes through gloo x = torch.ones(6) dist.all_reduce(x) print(f"cpu allreduce: {x}") # this goes through dummy if torch.cuda.is_available(): y = x.cuda() dist.all_reduce(y) print(f"cuda allreduce: {y}") try: dist.broadcast(y, 0) except RuntimeError: print("got RuntimeError when calling broadcast")
PyTorch 2.2 中文官方教程(十八)(3)https://developer.aliyun.com/article/1482608