大规模 MLOps 工程(四)(1)https://developer.aliyun.com/article/1517805
11.3.1 使用 Optuna 研究进行超参数优化
本节介绍了 Optuna 研究的概念,描述了它与 Optuna 试验的关系,并帮助您使用研究实例来运行和分析 HPO 实现中的一组试验。
在 Optuna 中,目标函数负责执行单个试验,包括从 Optuna 中检索超参数值、训练模型,然后根据验证损失评估训练模型的步骤。因此,每个试验仅向 Optuna HPO 算法返回单个评估指标,以便决定下一组建议的超参数值用于下一个试验。典型的 HPO 过程涉及几十甚至几百次试验;因此,有能力组织试验、比较其结果并分析试验中涉及的超参数是很重要的。在 Optuna 中,研究扮演着相关试验的容器角色,并提供有关试验结果和相关超参数的表格数据以及可视化数据。
如图 11.5 所示,通过一个目标函数,一个研究由优化方向(例如,最小化或最大化函数)和采样器来定义,采样器基于 Optuna 支持的许多 HPO 算法和框架之一实例化。用于初始化研究的种子(图 11.5 ❷)与用于初始化 PyTorch 和 NumPy 的种子值不同。在使用 HPO 时,此种子可用于创建下游随机数生成器的种子值,包括 Python 自己的随机数生成器。尽管 HPO 种子值从优化 DcTaxiModel 机器学习性能的角度来看毫无用处,但它确实具有确保 HPO 试验的可重现性的重要目的。
列表 11.4 中显示了 DcTaxiModel 的整个 HPO 实现。
列表 11.5 用于执行 HPO 的 Optuna 研究
def objective(trial): hparams = { "seed": trial.suggest_int('seed', 0, pt.iinfo(pt.int32).max - 1), "num_features": "8", "batch_norm_linear_layers": \ str(trial.suggest_int('batch_norm_linear_layers', 0, 1)), "optimizer": trial.suggest_categorical('optimizer', ['Adam', 'SGD']), "lr": trial.suggest_loguniform('lr', 0.009, 0.07), "num_hidden_neurons": \ str([trial.suggest_categorical(f"num_hidden_layer_{layer}_neurons", [7, 11]) for layer in \ range(trial.suggest_categorical('num_layers', [2, 3]))]), "batch_size": trial.suggest_int('batch_size', 30, 50, log = True), "max_batches": trial.suggest_int('max_batches', 30, 50, log = True) } model, trainer = build(DcTaxiModel(**hparams), train_glob = 'https://raw.githubusercontent.com/osipov/smlbook/ ➥ master/train.csv', val_glob = 'https://raw.githubusercontent.com/osipov/smlbook/ ➥ master/valid.csv') return trainer.callback_metrics['train_val_rmse'].item() import optuna from optuna.samplers import TPESampler ❶ study = \ optuna.create_study(direction = 'minimize', ❷ sampler = TPESampler( seed = 42 ),) study.optimize(objective, n_trials = 100) ❸
❶ 配置研究以最小化 DcTaxiModel 的 MSE 损失。
❷ 使用 TPE 算法进行 HPO。
❸ 使用 100 次试验开始 HPO。
在执行列表 11.5 中的代码后,study.optimize 方法完成了 100 次 HPO 试验。使用以下方式可获得各个试验的详细信息
study_df = study.trials_dataframe().sort_values(by='value', ascending = True) study_df[:5][['number', 'value', 'params_seed']],
应返回一个类似以下值的 pandas 数据帧:
number | value | params_seed |
96 | 2.390541 | 1372300804 |
56 | 7.403345 | 1017301131 |
71 | 9.006614 | 939699871 |
74 | 9.139935 | 973536326 |
94 | 9.817746 | 1075268021 |
其中,数字列指定由 Optuna 建议的试验的索引,值是由目标方法中的 trainer.callback_ metrics[‘train_val_rmse’].item() 返回的损失函数的相应值,params_seed 是用于初始化 DcTaxiModel 的模型参数(权重)的种子值。
11.3.2 在 Optuna 中可视化 HPO 研究
本节介绍了本章中使用三种不同 Optuna 可视化执行的 HPO 研究,并比较了这些可视化图在 HPO 方面的相关性。
完成的研究实例也可以使用 Optuna 可视化包进行可视化。虽然全面概述 Optuna 中各种可视化的范围超出了本书的范围,但我发现自己在一系列机器学习模型中一直重复使用三个可视化图。这些可视化图将在本节的其余部分按重要性下降的顺序进行解释。
超参数重要性图揭示了关于超参数对目标函数相对影响的惊人信息。在列表中包含种子超参数特别有用,以评估某些超参数是否比仅用于模型初始化的随机变量具有更多或更少的重要性。比随机种子更重要的超参数值得进一步研究,而比随机变量不重要的超参数应该降低优先级。
要创建重要性图,您可以使用
optuna.visualization.plot_param_importances(study)
这应该会生成类似于图 11.1 的条形图。
图 11.1 重要性图有助于指导后续 HPO 迭代。
一旦您确定了要更详细探讨的一组超参数,下一步就是在平行坐标图上绘制它们。您可以使用以下命令实例化此图。
optuna.visualization.plot_parallel_coordinate(study, params=["lr", "batch_size", "num_hidden_layer_0_neurons"])
这会绘制学习率(lr)、批量大小和 num_hidden_layer_0_neurons 超参数之间的关系。请注意,在图 11.2 中,线条代表各个试验配置,颜色较深的线条对应于目标函数值较低的试验。因此,通过某个超参数的一系列较深线条穿过一定区间,表明该超参数区间值得更仔细检查,可能需要进行另一次 HPO 迭代。
图 11.2 平行坐标图有助于确定超参数值的影响区间。
到目前为止描述的图中,轮廓图在生成有关研究结果的见解方面排在最后。由于轮廓图仅限于可视化超参数值对的图形,您会发现自己生成多个轮廓图,通常基于从重要性图或平行坐标图中选择的超参数。例如,要绘制批量大小、学习率和目标函数之间的关系,您可以运行
optuna.visualization.plot_contour(study, params=["batch_size", "lr"])
这应该生成类似于图 11.3 的图形。
图 11.3 轮廓图有助于分析与损失函数相关的超参数对。
摘要
- Optuna 是一个超参数优化框架,具有与本机 Python 集成,可支持机器学习模型实验中的复杂超参数配置。
- 当使用 HPO 用于跨量级的超参数范围时,采用对数均匀采样是有用的,以确保样本在范围内均匀分布而不是倾斜分布。
- 在执行 HPO 试验之后,Optuna 的可视化功能有助于分析试验结果和相关的超参数。
^(1.)无梯度算法不需要计算损失(或任何目标)函数的梯度来优化函数参数。换句话说,无梯度的超参数优化可以在目标函数没有可计算梯度的情况下进行优化。
^(2.)最初广泛引用的论文《Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift》介绍了批归一化,并可从arxiv.org/abs/1502.03167
获取。
第十二章:机器学习管道
本章包括
- 了解具有实验管理和超参数优化的机器学习管道
- 为了减少样板代码,为 DC 出租车模型实现 Docker 容器
- 部署机器学习管道以训练模型
到目前为止,你已经学习了机器学习的各个独立阶段或步骤。一次只专注于机器学习的一个步骤有助于集中精力处理更可管理的工作范围。然而,要部署一个生产机器学习系统,有必要将这些步骤集成到一个单一的管道中:一个步骤的输出流入到管道后续步骤的输入中。此外,管道应该足够灵活,以便启用超参数优化(HPO)过程来管理并对管道各阶段执行的具体任务进行实验。
在本章中,您将了解到用于集成机器学习管道、部署到 AWS 并使用实验管理和超参数优化训练 DC 出租车车费估算机器学习模型的概念和工具。
12.1 描述机器学习管道
本节介绍了解释本章描述的机器学习管道实现所需的核心概念。
为了澄清本章描述的机器学习管道的范围,有助于从整个管道的输入和输出描述开始。在输入方面,管道期望的是从探索性数据分析(EDA)和数据质量(数据清理)过程产生的数据集。机器学习管道的输出是一个或多个训练好的机器学习模型,这意味着管道的范围不包括将模型部署到生产环境的步骤。由于管道的输入和输出要求人机交互(EDA 和数据质量)或可重复自动化(模型部署),它们都不在 HPO 的范围内。
要了解机器学习管道所需的期望特性,请参考图 12.1。
图 12.1 一个统一的机器学习管道可以在每个阶段进行超参数优化。
在图中,数据准备、特征工程和机器学习模型训练阶段由 HPO 管理。使用由 HPO 管理的阶段可能会导致关于是否
- 在数据准备阶段,具有缺失数值特征的训练示例将从训练数据集中删除或更新为将缺失值替换为特征的预期(均值)值
- 在特征工程阶段,数值位置特征(如纬度或经度坐标)将通过分箱转换为具有 64 或 128 个不同值的分类特征
- 在机器学习训练阶段,模型使用随机梯度下降(SGD)或 Adam 优化器进行训练
尽管实施图 12.1 中的管道可能看起来很复杂,但通过使用一系列 PyTorch 和配套框架,您将能够在本节结束时部署它。本节中管道的实施依赖于以下技术:
- MLFlow — 用于开源实验管理
- Optuna — 用于超参数优化
- Docker — 用于管道组件打包和可重复执行
- PyTorch Lightning — 用于 PyTorch 机器学习模型训练和验证
- Kaen — 用于跨 AWS 和其他公共云提供商的管道供应和管理
在继续之前,总结一下将更详细地描述管道的 HPO 方面的关键概念是有帮助的。图 12.2 中的图表澄清了管道、HPO 和相关概念之间的关系。像 MLFlow 这样的实验管理平台(其他示例包括 Weights & Biases、Comet.ML 和 Neptune.AI)存储和管理实验实例,以便每个实例对应于不同的机器学习管道。例如,实验管理平台可以为实现训练 DC 出租车票价估算模型的机器学习管道存储一个实验实例,并为用于在线聊天机器人的自然语言处理模型训练的机器学习管道存储一个不同的实验实例。实验实例彼此隔离,但由单个实验管理平台管理。
图 12.2 实验管理器根据 HPO 设置控制管道执行(作业)实例。
每个实验实例使用 父运行 作为一个或多个机器学习管道执行(子运行)的集合。父运行配置为应用于多个管道执行的设置,例如 HPO 引擎(如 Optuna)使用的伪随机数种子的值。父运行还指定应执行以完成父运行的子运行(机器学习管道执行)的总数。由于每个机器学习管道执行还对应于一组唯一的超参数键/值对的组合,父运行指定的子运行数还指定了 HPO 引擎应生成的 HPO 试验(值集)的总数,以完成父运行。
机器学习管道代码与实验管理、超参数优化和机器学习模型训练的服务一起部署为 Docker 容器,在云提供商的虚拟专用云(VPC)网络中相互连接。该部署如图 12.3 所示。
图 12.3 具有 HPO 的机器学习管道部署为包含至少一个管理节点和一个工作节点以及可选管理节点和工作节点的一组 Docker 容器。
如图所示,为了部署具有 HPO 的机器学习管道,至少需要两个 Docker 容器在虚拟专用云网络上连接,部署中至少有一个管理器和至少一个工作节点。管理节点托管具有
- 实验管理服务(例如 MLFlow)
- HPO 引擎(例如 Optuna)作为与实验管理集成的服务运行(例如 Kaen 的 BaseOptunaService)
- 实验管理用户界面
- 工作节点管理服务,用于在工作节点上安排和编排机器学习管道子运行
工作节点托管具有机器学习模型(例如 PyTorch 代码)的 Docker 容器,以及描述如何根据超参数(例如 PyTorch Lightning 代码)训练、验证和测试机器学习模型的代码。
请注意,管理节点和工作节点的生命周期与节点上 Docker 容器执行的生命周期不同。这意味着相同的节点可以托管多个容器实例的执行和多个机器学习管道运行,而无需进行配置或取消配置。此外,管理节点上的容器是长时间运行的,例如为了在多个机器学习管道执行期间提供实验服务用户界面和超参数优化引擎服务,而工作节点上的容器仅在机器学习管道执行期间保持运行。
尽管本节描述的部署配置可能看起来复杂,但节点的提供、机器学习中间件(实验管理、超参数优化等)以及机器学习管道在工作节点之间的执行的编排完全由 Kaen 框架和相关的 Docker 容器处理。您将在本章后面学习有关该框架以及如何在现有的 Kaen 容器上构建您的机器学习管道的更多信息。
12.2 使用 Kaen 启用 PyTorch 分布式训练支持
本节说明了如何使用 PyTorch DistributedDataParallel 类添加 PyTorch 分布式训练支持。通过本节的结束,DC 出租车费用模型的 train 方法将被扩展以与云环境中的分布式训练框架 Kaen 集成。
与本书前几章的代码和 Jupyter 笔记本说明不同,本章剩余部分的代码要求您的环境已安装 Docker 和 Kaen。有关安装 Docker 和开始使用的更多信息,请参阅附录 B。要将 Kaen 安装到已存在 Docker 安装的环境中,请执行
pip install kaen[cli,docker]
这将下载并安装 kaen 命令行界面(CLI)到您的 shell 环境中。例如,如果 Kaen 安装正确,您可以使用以下命令获取有关 Kaen 命令的帮助:
kaen --help
这应该会产生类似以下的输出:
Usage: kaen [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: dojo Manage a dojo training environment. hpo Manage hyperparameter optimization. init Initialize a training dojo in a specified infrastructure... job Manage jobs in a specific dojo training environment. jupyter Work with a Jupyter notebook environment.
要执行本书其余部分的说明,请启动 Kaen Jupyter 环境,使用以下命令:
kaen jupyter
从您的 shell 环境中执行以下命令,这应该会在您本地的 Docker 主机上启动一个专门的 Jupyter 笔记本环境作为一个新的 Docker 容器。kaen jupyter 命令还应该将您的默认浏览器导航到 Jupyter 主页,并在 shell 中输出类似以下的文本:
Started Jupyter. Attempting to navigate to Jupyter in your browser using ➥ http://127.0.0.1:8888/?token=...
它指定了您可以在浏览器中使用的 URL,以打开新启动的 Jupyter 实例。
在 Jupyter 环境中,创建并打开一个新的笔记本。例如,您可以将笔记本命名为 ch12.ipynb。作为笔记本中的第一步,您应该执行 shell 命令
!mkdir -p src
在此环境中为您的代码创建一个 src 目录。请记住,在 Jupyter 中的 Python 代码单元格中使用感叹号!时,其后的命令将在底层的 bash shell 中执行。因此,运行该代码的结果是在文件系统中创建一个 src 目录。
接下来,使用 %%writefile 魔术将 DC 出租车模型的最新版本(如第十一章所述)保存到 src 目录中的 model_v1.py 文件中。
列表 12.1 将实现保存到 model_v1.py
%%writefile src/model_v1.py import sys import json import time import torch as pt import pytorch_lightning as pl from distutils.util import strtobool pt.set_default_dtype(pt.float64) class DcTaxiModel(pl.LightningModule): def __init__(self, **kwargs): super().__init__() self.save_hyperparameters() pt.manual_seed(int(self.hparams.seed)) self.step = 0 self.start_ts = time.perf_counter() self.train_val_rmse = pt.tensor(0.) #create a list of hidden layer neurons, e.g. [3, 5, 8] num_hidden_neurons = json.loads(self.hparams.num_hidden_neurons) self.layers = \ pt.nn.Sequential( pt.nn.Linear(int(self.hparams.num_features), num_hidden_neurons[0]), pt.nn.ReLU(), *self.build_hidden_layers(num_hidden_neurons, pt.nn.ReLU()), pt.nn.Linear(num_hidden_neurons[-1], 1) ) if 'batch_norm_linear_layers' in self.hparams \ and strtobool(self.hparams.batch_norm_linear_layers): self.layers = self.batch_norm_linear(self.layers) def build_hidden_layers(self, num_hidden_neurons, activation): linear_layers = [ pt.nn.Linear(num_hidden_neurons[i], num_hidden_neurons[i+1]) \ for i in range(len(num_hidden_neurons) - 1) ] classes = [activation.__class__] * len(num_hidden_neurons) activation_instances = list(map(lambda x: x(), classes)) hidden_layer_activation_tuples = \ list(zip(linear_layers, activation_instances)) hidden_layers = [i for sublist in \ hidden_layer_activation_tuples for i in sublist] return hidden_layers def batch_norm_linear(self, layers): idx_linear = \ list(filter(lambda x: type(x) is int, [idx if issubclass(layer.__class__, pt.nn.Linear) else None \ for idx, layer in enumerate(layers)])) idx_linear.append(sys.maxsize) layer_lists = [list(iter(layers[s:e])) \ for s, e in zip(idx_linear[:-1], idx_linear[1:])] batch_norm_layers = [pt.nn.BatchNorm1d(layer[0].in_features) \ for layer in layer_lists] batch_normed_layer_lists = [ [bn, *layers] \ for bn, layers in list(zip(batch_norm_layers, layer_lists)) ] return pt.nn.Sequential(*[layer \ for nested_layer in batch_normed_layer_lists \ for layer in nested_layer ]) def batchToXy(self, batch): batch = batch.squeeze_() X, y = batch[:, 1:], batch[:, 0] return X, y def forward(self, X): y_est = self.layers(X) return y_est.squeeze_() def log(self, k, v, **kwargs): super().log(k, v, on_step = kwargs['on_step'], on_epoch = kwargs['on_epoch'], prog_bar = kwargs['prog_bar'], logger = kwargs['logger'],) def training_step(self, batch, batch_idx): self.step += 1 X, y = self.batchToXy(batch) #unpack batch into features and label y_est = self.forward(X) loss = pt.nn.functional.mse_loss(y_est, y) for k,v in { "train_step": self.step, "train_mse": loss.item(), "train_rmse": loss.sqrt().item(), "train_steps_per_sec": \ self.step / (time.perf_counter() - self.start_ts), }.items(): self.log(k, v, step = self.step, on_step=True, on_epoch=True, prog_bar=True, logger=True) self.train_val_rmse = loss.sqrt() return loss def validation_step(self, batch, batch_idx): X, y = self.batchToXy(batch) with pt.no_grad(): loss = pt.nn.functional.mse_loss(self.forward(X), y) for k,v in { "val_mse": loss.item(), "val_rmse": loss.sqrt().item(), "train_val_rmse": (self.train_val_rmse + loss.sqrt()).item(), }.items(): self.log(k, v, step = self.step, on_step=True, on_epoch=True, prog_bar=True, logger=True) return loss def test_step(self, batch, batch_idx): X, y = self.batchToXy(batch) with pt.no_grad(): loss = pt.nn.functional.mse_loss(self.forward(X), y) for k,v in { "test_mse": loss.item(), "test_rmse": loss.sqrt().item(), }.items(): self.log(k, v, step = self.step, on_step=True, on_epoch=True, prog_bar=True, logger=True) def configure_optimizers(self): optimizers = {'Adam': pt.optim.AdamW, 'SGD': pt.optim.SGD} optimizer = optimizers[self.hparams.optimizer] return optimizer(self.layers.parameters(), lr = float(self.hparams.lr))
由于列表 12.1 中的代码将 DC 出租车模型的版本 1 保存到名为 model_v1.py 的文件中,因此构建和测试该模型版本的过程的入口点(在 src 目录的 trainer.py 文件中)从加载 model_v1 包中的 DC 出租车模型实例开始:
%%writefile src/trainer.py from model_v1 import DcTaxiModel import os import time import kaen import torch as pt import numpy as np import pytorch_lightning as pl import torch.distributed as dist from torch.utils.data import DataLoader from torch.nn.parallel import DistributedDataParallel from kaen.torch import ObjectStorageDataset as osds def train(model, train_glob, val_glob, test_glob = None): #set the pseudorandom number generator seed seed = int(model.hparams['seed']) \ ❶ if 'seed' in model.hparams \ else int( datetime.now().microsecond ) np.random.seed(seed) pt.manual_seed(seed) kaen.torch.init_process_group(model.layers) ❷ trainer = pl.Trainer(gpus = pt.cuda.device_count() \ if pt.cuda.is_available() else 0, max_epochs = 1, limit_train_batches = int( model.hparams.max_batches ) \ if 'max_batches' in model.hparams else 1, limit_val_batches = 1, num_sanity_val_steps = 1, val_check_interval = min(20, int( model.hparams.max_batches ) ), limit_test_batches = 1, log_every_n_steps = 1, gradient_clip_val=0.5, progress_bar_refresh_rate = 0, weights_summary = None,) train_dl = \ DataLoader(osds(train_glob, worker = kaen.torch.get_worker_rank(), replicas = kaen.torch.get_num_replicas(), shard_size = \ ❸ int(model.hparams.batch_size), batch_size = \ ❹ int(model.hparams.batch_size), storage_options = {'anon': False}, ), pin_memory = True) val_dl = \ DataLoader(osds(val_glob, batch_size = int(model.hparams.batch_size), storage_options = {'anon': False}, ), pin_memory = True) trainer.fit(model, train_dataloaders = train_dl, val_dataloaders = val_dl) if test_glob is not None: test_dl = \ DataLoader(osds(test_glob, batch_size = int(model.hparams.batch_size), storage_options = {'anon': False}, ), pin_memory = True) trainer.test(model, dataloaders=test_dl) return model, trainer if __name__ == "__main__": model, trainer = train(DcTaxiModel(**{ "seed": "1686523060", "num_features": "8", "num_hidden_neurons": "[3, 5, 8]", "batch_norm_linear_layers": "1", "optimizer": "Adam", "lr": "0.03", "max_batches": "1", "batch_size": str(2 ** 18),}), train_glob = \ os.environ['KAEN_OSDS_TRAIN_GLOB'] \ if 'KAEN_OSDS_TRAIN_GLOB' in os.environ \ else 'https://raw.githubusercontent.com/osipov/smlbook/ ➥ master/train.csv', val_glob = \ os.environ['KAEN_OSDS_VAL_GLOB'] \ if 'KAEN_OSDS_VAL_GLOB' in os.environ \ else 'https://raw.githubusercontent.com/osipov/smlbook/ ➥ master/valid.csv', test_glob = \ os.environ['KAEN_OSDS_TEST_GLOB'] \ if 'KAEN_OSDS_TEST_GLOB' in os.environ \ else 'https://raw.githubusercontent.com/osipov/smlbook/ ➥ master/valid.csv') print(trainer.callback_metrics)
❶ 使用超参数或当前时间戳初始化伪随机数种子。
❷ 自动更新 DC 出租车模型,以利用多个训练节点(如果有的话)。
❸ 正如第八章中所述,在分布式集群中,shard_size 往往不同于 . . .
❹ . . . 用于计算梯度的 batch_size。
在此时,您可以通过从您的 shell 环境中运行以下命令对 trainer.py 进行单元测试。
列表 12.2 运行一个简单的测试来确认实现是否按预期工作。
%%bash python3 src/trainer.py
这应该会使用 DC 出租车数据的小样本训练、测试并报告您模型的度量标准。
大规模 MLOps 工程(四)(3)https://developer.aliyun.com/article/1517809