大规模 MLOps 工程(三)(1)https://developer.aliyun.com/article/1517787
7.5 扩展以使用 GPU 核心
在本节中,您将修改列表 7.4 中的基准线性回归实现,以利用计算节点的多个 GPU 核心。 正如您从第 7.4 节中学到的那样,要使您的机器学习代码适应 GPU 的优势,您需要确保通过调用 torch.cuda.is_available() 方法正确配置 PyTorch 的 CUDA 设备和设备驱动程序(列表 7.7 ❶),其中可用设备被分配给设备变量。
列表 7.7 使用线性回归的弱基准
iimport os import torch as pt from torch.utils.data import DataLoader from kaen.torch import ObjectStorageDataset as osds pt.manual_seed(0); pt.set_default_dtype(pt.float64) device = pt.device("cuda" \ if pt.cuda.is_available() else "cpu") ❶ BATCH_SIZE = 1_048_576 # = 2 ** 20 train_ds = osds(f"s3://dc-taxi-{os.environ['BUCKET_ID']}- ➥ {os.environ['AWS_DEFAULT_REGION']}/csv/dev/part*.csv", storage_options = {'anon': False}, batch_size = BATCH_SIZE) train_dl = DataLoader(train_ds, pin_memory = True) ❷ FEATURE_COUNT = 8 w = \ ❷' pt.nn.init.kaiming_uniform_(pt.empty(FEATURE_COUNT, 1, requires_grad=True, device=device))❸ b = \ pt.nn.init.kaiming_uniform_(pt.empty(1, 1, requires_grad = True, device=device))❹ def batchToXy(batch): batch = batch.squeeze_().to(device) ❽ return batch[:, 1:], batch[:, 0] def forward(X): y_pred = X @ w + b return y_pred.squeeze_() def loss(y_est, y): mse_loss = pt.mean((y_est - y) ** 2) return mse_loss LEARNING_RATE = 0.03 optimizer = pt.optim.SGD([w, b], lr = LEARNING_RATE) GRADIENT_NORM = 0.5 ITERATION_COUNT = 50 for iter_idx, batch in zip(range(ITERATION_COUNT), train_dl): ❺ start_ts = time.perf_counter() X, y = batchToXy(batch) y_est = forward(X) mse = loss(y_est, y) mse.backward() pt.nn.utils.clip_grad_norm_([w, b], GRADIENT_NORM) if GRADIENT_NORM else None optimizer.step() optimizer.zero_grad() sec_iter = time.perf_counter() - start_ts print(f"Iteration: {iter_idx:03d}, Seconds/Iteration: {sec_iter:.3f} ➥ MSE: {mse.data.item():.2f}")
❶ 当设备可用时,使用 GPU。
❷ 自定义 DataLoader 以将数据固定在内存中以加速传输。
❷ 初始化模型参数…
❸ 当可用时,将模型偏差设置为使用 GPU 设备,否则使用 CPU。
❽ 当 GPU 设备可用时,将批数据传输到 GPU 设备,否则不执行操作。
剩余的更改在清单 7.7❸—❾中突出显示。请注意,DataLoader 的实例化已更改以利用 pin_memory 参数❸。此参数通过“固定住”操作系统的虚拟内存页面,以防止页面从物理内存交换到存储器,反之亦然,从而帮助加速大张量从 CPU 内存到 GPU 的传输。其余的更改❹—❾只是为了指定正确的设备与 PyTorch 张量一起使用:如果 GPU 可用于 PyTorch 运行时,则为 cuda,否则为 cpu。
运行代码清单 7.7 应该可以通过 MSE 损失来展示弱基线:
Iteration: 040, Seconds/Iteration: 0.009 MSE: 865.98 Iteration: 041, Seconds/Iteration: 0.009 MSE: 36.48 Iteration: 042, Seconds/Iteration: 0.009 MSE: 857.78 Iteration: 043, Seconds/Iteration: 0.009 MSE: 39.33 Iteration: 044, Seconds/Iteration: 0.009 MSE: 868.70 Iteration: 045, Seconds/Iteration: 0.009 MSE: 37.57 Iteration: 046, Seconds/Iteration: 0.009 MSE: 870.87 Iteration: 047, Seconds/Iteration: 0.009 MSE: 36.42 Iteration: 048, Seconds/Iteration: 0.009 MSE: 852.75 Iteration: 049, Seconds/Iteration: 0.009 MSE: 36.37
总结
- 在较小的机器学习问题中使用内存中的方法更快且更有效,而使用内存不足的技术可以扩展到更大的数据集和更大的信息技术资源池:计算、存储、网络。
- 使用数据集批处理结合梯度下降,使得您的 PyTorch 代码能够利用单个节点中的计算资源并扩展到利用计算集群中的多个节点。
- PyTorch 的 IterableDataset 简化了在 PyTorch 代码中对于内存不足和流式数据集的批处理的使用,而 ObjectStorageDataset 实用程序类提供了已经准备好的用于内存不足数据集的实现。
- PyTorch 对于 GPU 的支持是由 CUDA 设备驱动程序实现的,它使得 PyTorch 开发人员可以轻松地扩展现有的 PyTorch 代码,以利用 GPU 的更高吞吐量和更多的并行计算能力。
- 在 PyTorch 中实现基本的线性回归可以为 DC 出租车数据集上预期的训练均方误差提供一个弱基线。
^(1.)ObjectStorageDataset 支持的存储选项的完整列表可在此处找到: filesystem-spec.readthedocs.io/en/latest/
。
^(2.)论文摘要以及 PDF 版本的链接可以从 arXiv 获取:arxiv.org/ abs/1502.01852
。
^(3.)比赛网站以及比赛结果的链接可在此处找到:mng.bz/Koqj
。
^(4.)如 Alex Krizhevsky 的论文所述:www.cs.toronto.edu/~hinton/absps/imagenet.pdf
。
^(5.)谷歌开发了一种用于加速张量操作的张量处理单元(TPU)。
^(6.)关于 PyTorch 数据类型(dtypes)以及相应的 CPU 和 GPU 张量实现的详细文档,请参考mng.bz/9an7
。
^(7.)设备参数对于所有 PyTorch 张量的创建操作都可用,并在此处记录: mng.bz/jj8r
。
第八章:使用分布式训练进行扩展
本章内容包括:
- 理解分布式数据并行梯度下降
- 在梯度下降中使用梯度累积以处理内存不足的数据集
- 对比参数服务器和基于环结构的分布式梯度下降方法
- 理解基于环结构的梯度下降的 reduce-scatter 和 all-gather 阶段
- 使用 Python 实现基于环结构的分布式梯度下降的单节点版本
在第七章中,您了解了如何将机器学习实现扩展到单个计算节点上,以充分利用可用的计算资源。例如,您可以看到如何利用 GPU 设备中更强大的处理器。然而,当您在生产中启动一个机器学习系统时,训练示例的数量增长速度和训练数据集的规模可能会超过即使是最强大的服务器和工作站的计算能力。尽管借助现代公共云基础设施的升级,例如通过升级到更强大的处理器,增加内存或 GPU 设备,可以获得很大的扩展性,但您应该有一个更好的长远计划。
分布式数据并行(DDP)训练是一种依靠扩展而不是升级的机器学习模型训练方法。随着训练数据集的增大,通过将模型训练涉及的计算工作负载划分并在网络计算服务器(节点)集群上进行,可以进行扩展。这里的“节点”是连接成集群的网络上的虚拟或物理服务器。与采用更高性能(也往往更昂贵)的计算节点来执行机器学习模型训练(即扩展方法)不同,通过扩展,您可以将一组较弱甚至是普通的计算节点组成一个网络,并通过在节点之间分布和并行执行工作的方式,可能更早地完成训练。事实上,将训练数据集扩展到更大规模意味着向集群中添加更多的节点。
DDP 模型训练不仅仅是通过向集群中添加节点来进行扩展。“数据并行”方面的 DDP 描述了在集群中,每个节点仅使用训练数据集的独立且互斥的划分(也称为“分片”)来计算梯度。通常,每个分片中的训练示例数量都被选定为确保每个节点的内存可以容纳该分片。虽然在 DDP 方法中,集群中的每个训练节点在梯度下降的每次迭代中都使用数据集的不同分片,但在迭代的范围内,所有节点必须使用相同的模型副本进行训练以计算模型参数梯度。因此,在节点根据训练数据集(或一批训练示例)计算梯度后,节点必须同步到更新后的模型参数版本。
在本章中,您将了解分布式梯度下降的替代方法以及 DDP 梯度下降实现如何帮助您有效地跨任意数量的节点扩展训练,同时使用具有有限计算、内存、存储和带宽资源的实用节点。
8.1 如果训练数据集不适合内存怎么办?
本节及其子节提供了逐步介绍梯度累积以及梯度累积在梯度下降中的作用,以支持超出内存训练数据集的功能。
8.1.1 说明梯度累积
本节演示了使用 PyTorch 的 autograd 库进行梯度累积。虽然本节中的示例是基于使用 autograd 与一个简单函数的情况,但后面的部分将梯度累积应用于更现实的示例。
当使用梯度下降与反向模式累积自动微分时,在执行梯度下降的优化步骤后,有必要清除张量的梯度值。在 PyTorch 中,可以通过将张量的梯度设置为 None 或使用 torch.optim.Optimizer 的辅助方法 zero_grad 来实现此操作。除非将梯度清零(清除),否则由损失函数产生的张量的 backward 方法的调用可能会导致模型的张量中梯度值的累积。以下列表显示了这种行为。
列表 8.1 说明梯度累积对于反向调用的重复调用的插图
import torch as pt x = pt.tensor(3., requires_grad=True) ❶ y = x ** 2 for _ in range(5): y.backward(retain_graph=True) ❷ print(x.grad)
❶ 使用 requires_grad=True 来启用对 y 相对于 x 的微分。
❷ 设置 retain_graph=True 来防止 PyTorch 释放内存。
这会输出
tensor(6.) tensor(12.) tensor(18.) tensor(24.) tensor(30.)
根据对 的 y = x² 的五次重复调用,输出为 3 时为 6。由于累积的结果,x.grad 的输出在 for 循环的 5 次迭代中跳过 6。尽管梯度累积可能看起来像是自动微分的一个不方便的副作用,但在将梯度下降扩展到超出内存数据集和分布式集群时,它可以发挥有用的作用。
8.1.2 准备一个示例模型和数据集
本节描述了如何准备一个示例模型和一个训练数据集,以说明梯度累积在扩展到超出内存数据集时的作用。在下一节中,您将学习如何在梯度下降中使用模型和数据集。
假设您正在处理一个包含 1,000 个结构化记录的训练数据集,并且执行您的梯度下降算法的计算节点只能一次容纳 250 个示例。当然,现代计算环境可以扩展到更大的数据集;然而,选择这些数字将证明对实例有用。让我们首先看一个适合内存的虚构数据集的梯度累积,然后再直接进入现实世界的超出内存数据集的复杂性。
列表 8.2 准备一个样本多元线性回归数据集
pt.manual_seed(42) ❶ FEATURES = 4 ❷ TRAINING_DATASET_SIZE = 1000 ❸ X_train = pt.distributions.multivariate_normal. ➥ MultivariateNormal( ❹ pt.arange(FEATURES, dtype=pt.float32), ❺ pt.eye(FEATURES)).sample((TRAINING_DATASET_SIZE,)) ❻ y_train = X_train @ (pt.arange(FEATURES, dtype=pt.float32) + 1) ❼
❶ 设置伪随机数种子以实现可重现性。
❷ 创建用于多元线性回归问题的数据集。
❸ 在训练示例数据集中使用 1,000 条记录(行)。
❹ 使用 multivariate_normal 生成合成训练数据集。
❺ 使用不同的均值来作为独立变量。
❻ 指定独立变量应该不相关。
❼ 将 X_train 中的特征与系数相乘。
此列表创建了一个训练数据集,其中有四个特征(自变量)和一个因变量(标签),基于每个特征的四个系数 1、2、3、4。例如,假设在生成 X_train 值时使用了种子值 42,则 y_train[0] 的值是从 X_train[0,:] 计算的:
print(X_train[0, :] @ pt.tensor([1, 2, 3, 4], dtype = pt.float32))
应输出
tensor(19.1816)
您还可以通过打印来确认训练数据集张量 X_train 和 y_train 的预期形状
print(X_train.shape, y_train.shape)
应基于 TRAINING_DATASET_SIZE 和 FEATURES 的值输出如下:
(torch.Size([1000, 4]), torch.Size([1000]))
有了训练数据集张量的准备,您可以准备一个线性回归模型和支持方法,使用梯度下降来训练模型。模型 w 是用从标准正态分布中抽取的随机值初始化的。此外,由于模型参数张量 w 被创建为 requires_grad=True,因此张量的初始梯度值设置为 None。
列表 8.3 定义模型 w 和梯度下降的实用方法
pt.manual_seed(42) w = pt.randn(FEATURES, requires_grad = True) ❶ def forward(w, X): ❷ return X @ w def mse(y_est, y): err = y_est - y ❸ return (err ** 2).mean() ❹
❶ 创建多元线性回归问题的模型。
❷ 基于模型 w 实现梯度下降的前向步骤。
❸ 计算目标(y)的误差(残差)。
❹ 返回均方误差的值。
尽管您可以使用更复杂的技术来初始化 w,但在这种情况下,多元线性回归问题足够简单,不需要增加复杂性。
8.1.3 理解使用超出内存的数据片段的梯度下降
在本节中,使用第 8.1.2 节准备的模型和数据集,使用梯度下降利用 autodiff 的梯度累积特性来扩展到超出内存的数据集。
通过依赖梯度累积,梯度下降可以使用图 8.1 中所示的方法基于整个训练数据集(即梯度下降的一个时期)来计算梯度。注意不要将图 8.1 中显示的分片与 mini-batch 梯度下降中使用的批次混淆;区别在下面的段落中进行了澄清。
图 8.1 梯度累积重新使用分片内存以实现对超出内存的数据集的扩展。
图 8.1 的左侧显示了使用 [0:250][0] 表示训练数据集中的前 250 个示例(记录)的第一个分片,[0:250][1] 表示第二个分片,即记录从 250 到 500,依此类推。在这里,使用 Python 切片表示法(例如,[0:250])来指定训练数据集中的哪些 1,000 个示例包含在一个分片中。
请注意,在图 8.1 中,每个分片都使用相同的模型 w 进行处理(在梯度下降的前向和后向步骤中),或者更准确地说,使用相同的 w 模型参数值。虽然图 8.1 中梯度积累的四个顺序步骤中模型参数值是相同的,但由于每个分片包含训练示例的不同集合,因此为每个分片计算的梯度也是不同的,并且是特定于分片的。在图中,使用下标表示分片及其相应的梯度之间的关系,以便分片 [0:250][0] 产生梯度g[0],依此类推。
一旦每个分片的训练样本计算出梯度(见清单 8.4),则不会使用分片梯度来更新模型参数。相反,梯度被保留在模型张量中累积。因此,在第二个训练示例分片通过前向方法处理,然后后向方法计算相应的梯度g[1]之后,模型张量 w.grad 包含梯度g[0]+g[1]的总和(累积)。
请注意,使用分片进行计算与小批量梯度下降中的批量计算不同,其中来自每个批次的梯度用于更新模型参数,然后清除。将批次与分片区分开很有用,因为两者都可以与梯度下降一起使用;例如,分片可以是批次的分区,在数据批次不适合节点内存的情况下。分片还可以由多个批次组成,以便通过处理存储在节点内存中的多个批次来加速梯度下降。虽然可以将分片与小批量梯度下降一起使用,但本节重点介绍使用分片与普通梯度下降的更基本示例,其中根据整个训练示例集计算梯度。
仅在处理完整个训练数据集后,一次处理一个分片,图 8.1 中的算法才执行基于累积梯度g[0]+g[1]+g[2]+g[3]的梯度下降的优化步骤。
清单 8.4 使用 IN_MEMORY_SHARD_SIZE 示例的梯度下降
EPOCHS = 500 LEARNING_RATE = 0.01 IN_MEMORY_SHARD_SIZE = 250 for epoch in range(EPOCHS): for i in range(0, \ TRAINING_DATASET_SIZE // IN_MEMORY_SHARD_SIZE): ❶ start_idx = i * IN_MEMORY_SHARD_SIZE end_idx = start_idx + IN_MEMORY_SHARD_SIZE y_shard = y_train[start_idx : end_idx] X_shard = X_train[start_idx : end_idx] ❷ y_est = forward(w, X_shard) ❸ loss = \ ❹ (IN_MEMORY_SHARD_SIZE / TRAINING_DATASET_SIZE) * mse(y_est, y_shard) loss.backward() ❺ #notice that the following is #in scope of the outer for loop w.data -= LEARNING_RATE * w.grad ❻ w.grad = None ❼
❶ 每个周期执行 TRAINING_DATASET_SIZE // IN_MEMORY_SHARD_SIZE 次迭代。
❷ 将训练示例分配给 y_shard 和 X_shard。
❸ 执行梯度下降的前向步骤。
❹ 计算调整后的分片大小训练损失。
❺ 执行反向传播和梯度累积
❻ 执行梯度下降优化步骤。
❼ 清除模型张量的梯度。
代码执行后,打印语句
print(w)
应该输出
tensor([1.0000, 2.0000, 3.0000, 4.0000], requires_grad=True)
证实梯度下降正确地恢复了列表 8.2 中使用的系数[1.0000,2.0000,3.0000,4.0000],以创建由 y_train 和 X_train 组成的训练数据集。
在列表 8.4 中计算损失时使用的分数(IN_MEMORY_SHARD_SIZE / TRAINING_DATASET_SIZE)微妙但重要。回想一下,该列表旨在计算整个训练示例或更准确地说是 TRAINING_DATASET_SIZE 示例的梯度。mse 方法的默认实现,计算模型估计值 y_est 的均方误差,假定在计算期间有 IN_MEMORY_SHARD_SIZE 个示例。换句话说,在列表中内部 for 循环的每次迭代中,通过计算 mse 来计算,或者在 PyTorch 中等效地使用
(1 / IN_MEMORY_SHARD_SIZE) * ((y_est - y_shard) ** 2).sum()
返回每个 IN_MEMORY_DATASET_SIZE 示例的均方误差。列表 8.4 中在计算损失时使用的(IN_MEMORY_SHARD_SIZE / TRAINING_DATASET_SIZE)分数将均方误差重新缩放为 TRAINING_DATASET_SIZE 示例。
通过这个以方程表示的乘法,注意到重新缩放相当于 IN_MEMORY_DATASET_SIZE,这在的分子和分母中取消了。
当内部 for 循环完成时,w.grad 包含训练示例梯度的总和,因此代码 w.data -= LEARNING_RATE * w.grad 计算了整个 epoch 的片段的优化步骤。换句话说,在列表 8.4 中的梯度下降实现中,梯度优化步骤是针对每个训练示例的 epoch 执行一次。这证实了列表 8.4 中的实现不是小批量梯度下降。
虽然图 8.1 中的方法使得可以在使用任意片段大小的内存外数据集上进行扩展,但它遭受着一个显著的算法复杂性问题:内部 for 循环是顺序的,这会将梯度下降实现的大零性能从O(EPOCHS)变为O(EPOCHS * SHARDS)。
将列表 8.4 中的内部 for 循环分布到一组并行工作节点上,可以将实现恢复到原始O(EPOCHS)最坏情况的性能。但是如何高效地实现呢?
大规模 MLOps 工程(三)(3)https://developer.aliyun.com/article/1517791