Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(五)(2)https://developer.aliyun.com/article/1482432
练习
- 您如何用简短的句子描述 TensorFlow?它的主要特点是什么?您能否列出其他流行的深度学习库?
- TensorFlow 是否可以替代 NumPy?它们之间的主要区别是什么?
tf.range(10)
和tf.constant(np.arange(10))
会得到相同的结果吗?- 您能否列出 TensorFlow 中除了常规张量之外的其他六种数据结构?
- 您可以通过编写函数或子类化
tf.keras.losses.Loss
类来定义自定义损失函数。您会在什么时候使用每个选项? - 同样,您可以在函数中定义自定义指标,也可以作为
tf.keras.metrics.Metric
的子类。您会在什么时候使用每个选项? - 何时应该创建自定义层而不是自定义模型?
- 有哪些需要编写自定义训练循环的用例?
- 自定义 Keras 组件可以包含任意的 Python 代码吗,还是必须可转换为 TF 函数?
- 如果您希望函数可转换为 TF 函数,主要需要遵守哪些规则?
- 何时需要创建一个动态的 Keras 模型?如何做到这一点?为什么不将所有模型都设置为动态的呢?
- 实现一个执行层归一化的自定义层(我们将在第十五章中使用这种类型的层):
build()
方法应该定义两个可训练的权重α和β,形状都是input_shape[-1:]
,数据类型为tf.float32
。α应该初始化为 1,β初始化为 0。call()
方法应该计算每个实例特征的平均值μ和标准差σ。为此,您可以使用tf.nn.moments(inputs, axes=-1, keepdims=True)
,它返回所有实例的平均值μ和方差σ²(计算方差的平方根以获得标准差)。然后函数应该计算并返回α ⊗ (X - μ)/(σ + ε) + β,其中 ⊗ 表示逐元素乘法(*
),ε是一个平滑项(一个小常数,避免除以零,例如 0.001)。- 确保您的自定义层产生与
tf.keras.layers.LayerNormalization
层相同(或非常接近)的输出。
- 使用自定义训练循环训练一个模型,以处理 Fashion MNIST 数据集(参见第十章):
- 显示每个时代、迭代、平均训练损失和每个时代的平均准确率(在每次迭代更新),以及每个时代结束时的验证损失和准确率。
- 尝试使用不同的优化器以及不同的学习率来处理上层和下层。
这些练习的解决方案可以在本章笔记本的末尾找到,网址为https://homl.info/colab3。
然而,Facebook 的 PyTorch 库目前在学术界更受欢迎:比起 TensorFlow 或 Keras,更多的论文引用 PyTorch。此外,Google 的 JAX 库正在获得动力,尤其是在学术界。
TensorFlow 包括另一个名为estimators API的深度学习 API,但现在已经不推荐使用。
如果您有需要(但您可能不会),您可以使用 C++ API 编写自己的操作。
要了解更多关于 TPU 以及它们如何工作的信息,请查看https://homl.info/tpus。
tf.math.log()
是一个值得注意的例外,它通常被使用,但没有tf.log()
的别名,因为这可能会与日志记录混淆。
使用加权平均值不是一个好主意:如果这样做,那么具有相同权重但在不同批次中的两个实例将对训练产生不同的影响,这取决于每个批次的总权重。
{**x, [...]}
语法是在 Python 3.5 中添加的,用于将字典x
中的所有键/值对合并到另一个字典中。自 Python 3.9 起,您可以使用更好的x | y
语法(其中x
和y
是两个字典)。
然而,Huber 损失很少用作度量标准——通常更喜欢使用 MAE 或 MSE。
这个类仅用于说明目的。一个更简单和更好的实现方法是只需子类化tf.keras.metrics.Mean
类;请参阅本章笔记本的“流式指标”部分以获取示例。
Keras API 将此参数称为input_shape
,但由于它还包括批量维度,我更喜欢将其称为batch_input_shape
。
在 Keras 中,“子类 API”通常只指通过子类化创建自定义模型,尽管在本章中您已经看到,许多其他东西也可以通过子类化创建。
由于 TensorFlow 问题#46858,这种情况下调用super().build()
可能会失败,除非在您阅读此内容时已修复该问题。如果没有,请将此行替换为self.built = True
。
您还可以在模型内的任何层上调用add_loss()
,因为模型会递归地从所有层中收集损失。
如果磁带超出范围,例如当使用它的函数返回时,Python 的垃圾收集器会为您删除它。
除了优化器之外,很少有人会自定义这些;请参阅笔记本中的“自定义优化器”部分以获取示例。
然而,在这个简单的例子中,计算图非常小,几乎没有任何优化的空间,所以tf_cube()
实际上比cube()
运行得慢得多。
第十三章:使用 TensorFlow 加载和预处理数据
在第二章中,您看到加载和预处理数据是任何机器学习项目的重要部分。您使用 Pandas 加载和探索(修改后的)加利福尼亚房屋数据集——该数据集存储在 CSV 文件中——并应用 Scikit-Learn 的转换器进行预处理。这些工具非常方便,您可能会经常使用它们,特别是在探索和实验数据时。
然而,在大型数据集上训练 TensorFlow 模型时,您可能更喜欢使用 TensorFlow 自己的数据加载和预处理 API,称为tf.data。它能够非常高效地加载和预处理数据,使用多线程和排队从多个文件中并行读取数据,对样本进行洗牌和分批处理等。此外,它可以实时执行所有这些操作——在 GPU 或 TPU 正在训练当前批次数据时,它会在多个 CPU 核心上加载和预处理下一批数据。
tf.data API 允许您处理无法放入内存的数据集,并充分利用硬件资源,从而加快训练速度。tf.data API 可以直接从文本文件(如 CSV 文件)、具有固定大小记录的二进制文件以及使用 TensorFlow 的 TFRecord 格式的二进制文件中读取数据。
TFRecord 是一种灵活高效的二进制格式,通常包含协议缓冲区(一种开源二进制格式)。tf.data API 还支持从 SQL 数据库中读取数据。此外,许多开源扩展可用于从各种数据源中读取数据,例如 Google 的 BigQuery 服务(请参阅https://tensorflow.org/io)。
Keras 还提供了强大而易于使用的预处理层,可以嵌入到您的模型中:这样,当您将模型部署到生产环境时,它将能够直接摄取原始数据,而无需您添加任何额外的预处理代码。这消除了训练期间使用的预处理代码与生产中使用的预处理代码之间不匹配的风险,这可能会导致训练/服务偏差。如果您将模型部署在使用不同编程语言编写的多个应用程序中,您不必多次重新实现相同的预处理代码,这也减少了不匹配的风险。
正如您将看到的,这两个 API 可以联合使用——例如,从 tf.data 提供的高效数据加载和 Keras 预处理层的便利性中受益。
在本章中,我们将首先介绍 tf.data API 和 TFRecord 格式。然后我们将探索 Keras 预处理层以及如何将它们与 tf.data API 一起使用。最后,我们将快速查看一些相关的库,您可能会发现它们在加载和预处理数据时很有用,例如 TensorFlow Datasets 和 TensorFlow Hub。所以,让我们开始吧!
tf.data API
整个 tf.data API 围绕着 tf.data.Dataset
的概念展开:这代表了一系列数据项。通常,您会使用逐渐从磁盘读取数据的数据集,但为了简单起见,让我们使用 tf.data.Dataset.from_tensor_slices()
从一个简单的数据张量创建数据集:
>>> import tensorflow as tf >>> X = tf.range(10) # any data tensor >>> dataset = tf.data.Dataset.from_tensor_slices(X) >>> dataset <TensorSliceDataset shapes: (), types: tf.int32>
from_tensor_slices()
函数接受一个张量,并创建一个 tf.data.Dataset
,其中的元素是沿着第一维度的所有 X
的切片,因此这个数据集包含 10 个项目:张量 0、1、2、…、9。在这种情况下,如果我们使用 tf.data.Dataset.range(10)
,我们将获得相同的数据集(除了元素将是 64 位整数而不是 32 位整数)。
您可以简单地迭代数据集的项目,如下所示:
>>> for item in dataset: ... print(item) ... tf.Tensor(0, shape=(), dtype=int32) tf.Tensor(1, shape=(), dtype=int32) [...] tf.Tensor(9, shape=(), dtype=int32)
注意
tf.data API 是一个流式 API:您可以非常高效地迭代数据集的项目,但该 API 不适用于索引或切片。
数据集还可以包含张量的元组,或名称/张量对的字典,甚至是张量的嵌套元组和字典。在对元组、字典或嵌套结构进行切片时,数据集将仅切片它包含的张量,同时保留元组/字典结构。例如:
>>> X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]} >>> dataset = tf.data.Dataset.from_tensor_slices(X_nested) >>> for item in dataset: ... print(item) ... {'a': (<tf.Tensor: [...]=1>, <tf.Tensor: [...]=4>), 'b': <tf.Tensor: [...]=7>} {'a': (<tf.Tensor: [...]=2>, <tf.Tensor: [...]=5>), 'b': <tf.Tensor: [...]=8>} {'a': (<tf.Tensor: [...]=3>, <tf.Tensor: [...]=6>), 'b': <tf.Tensor: [...]=9>}
链接转换
一旦您有了数据集,您可以通过调用其转换方法对其应用各种转换。每个方法都会返回一个新的数据集,因此您可以像这样链接转换(此链在图 13-1 中有示例):
>>> dataset = tf.data.Dataset.from_tensor_slices(tf.range(10)) >>> dataset = dataset.repeat(3).batch(7) >>> for item in dataset: ... print(item) ... tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32) tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32) tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32) tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32) tf.Tensor([8 9], shape=(2,), dtype=int32)
在这个例子中,我们首先在原始数据集上调用repeat()
方法,它返回一个将原始数据集的项目重复三次的新数据集。当然,这不会将所有数据在内存中复制三次!如果您调用此方法而没有参数,新数据集将永远重复源数据集,因此迭代数据集的代码将不得不决定何时停止。
然后我们在这个新数据集上调用batch()
方法,再次创建一个新数据集。这个新数据集将把前一个数据集的项目分组成七个项目一组的批次。
图 13-1. 链接数据集转换
最后,我们迭代这个最终数据集的项目。batch()
方法必须输出一个大小为两而不是七的最终批次,但是如果您希望删除这个最终批次,使所有批次具有完全相同的大小,可以调用batch()
并使用drop_remainder=True
。
警告
数据集方法不会修改数据集,它们会创建新的数据集。因此,请确保保留对这些新数据集的引用(例如,使用dataset = ...
),否则什么也不会发生。
您还可以通过调用map()
方法来转换项目。例如,这将创建一个所有批次乘以二的新数据集:
>>> dataset = dataset.map(lambda x: x * 2) # x is a batch >>> for item in dataset: ... print(item) ... tf.Tensor([ 0 2 4 6 8 10 12], shape=(7,), dtype=int32) tf.Tensor([14 16 18 0 2 4 6], shape=(7,), dtype=int32) [...]
这个map()
方法是您将调用的方法,用于对数据进行任何预处理。有时这将包括一些可能相当密集的计算,比如重塑或旋转图像,因此您通常会希望启动多个线程以加快速度。这可以通过将num_parallel_calls
参数设置为要运行的线程数,或者设置为tf.data.AUTOTUNE
来完成。请注意,您传递给map()
方法的函数必须可以转换为 TF 函数(请参阅第十二章)。
还可以使用filter()
方法简单地过滤数据集。例如,此代码创建一个仅包含总和大于 50 的批次的数据集:
>>> dataset = dataset.filter(lambda x: tf.reduce_sum(x) > 50) >>> for item in dataset: ... print(item) ... tf.Tensor([14 16 18 0 2 4 6], shape=(7,), dtype=int32) tf.Tensor([ 8 10 12 14 16 18 0], shape=(7,), dtype=int32) tf.Tensor([ 2 4 6 8 10 12 14], shape=(7,), dtype=int32)
您经常会想查看数据集中的一些项目。您可以使用take()
方法来实现:
>>> for item in dataset.take(2): ... print(item) ... tf.Tensor([14 16 18 0 2 4 6], shape=(7,), dtype=int32) tf.Tensor([ 8 10 12 14 16 18 0], shape=(7,), dtype=int32)
数据洗牌
正如我们在第四章中讨论的,梯度下降在训练集中的实例是独立且同分布(IID)时效果最好。确保这一点的一个简单方法是对实例进行洗牌,使用shuffle()
方法。它将创建一个新数据集,首先用源数据集的前几个项目填充缓冲区。然后,每当需要一个项目时,它将从缓冲区随机取出一个项目,并用源数据集中的新项目替换它,直到完全迭代源数据集。在这一点上,它将继续从缓冲区随机取出项目,直到缓冲区为空。您必须指定缓冲区大小,并且很重要的是要足够大,否则洗牌效果不会很好。¹ 只是不要超出您拥有的 RAM 量,尽管即使您有很多 RAM,也没有必要超出数据集的大小。如果您希望每次运行程序时都获得相同的随机顺序,可以提供一个随机种子。例如,以下代码创建并显示一个包含 0 到 9 的整数,重复两次,使用大小为 4 的缓冲区和随机种子 42 进行洗牌,并使用批次大小为 7 进行批处理的数据集:
>>> dataset = tf.data.Dataset.range(10).repeat(2) >>> dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7) >>> for item in dataset: ... print(item) ... tf.Tensor([3 0 1 6 2 5 7], shape=(7,), dtype=int64) tf.Tensor([8 4 1 9 4 2 3], shape=(7,), dtype=int64) tf.Tensor([7 5 0 8 9 6], shape=(6,), dtype=int64)
提示
如果在打乱的数据集上调用repeat()
,默认情况下它将在每次迭代时生成一个新的顺序。这通常是个好主意,但是如果您希望在每次迭代中重复使用相同的顺序(例如,用于测试或调试),可以在调用shuffle()
时设置reshuffle_each_iteration=False
。
对于一个无法放入内存的大型数据集,这种简单的打乱缓冲区方法可能不够,因为缓冲区相对于数据集来说很小。一个解决方案是对源数据本身进行打乱(例如,在 Linux 上可以使用shuf
命令对文本文件进行打乱)。这将显著改善打乱效果!即使源数据已经被打乱,通常也会希望再次打乱,否则每个时期将重复相同的顺序,模型可能会出现偏差(例如,由于源数据顺序中偶然存在的一些虚假模式)。为了进一步打乱实例,一个常见的方法是将源数据拆分为多个文件,然后在训练过程中以随机顺序读取它们。然而,位于同一文件中的实例仍然会相互靠近。为了避免这种情况,您可以随机选择多个文件并同时读取它们,交错它们的记录。然后在此基础上使用shuffle()
方法添加一个打乱缓冲区。如果这听起来很费力,不用担心:tf.data API 可以在几行代码中实现所有这些。让我们看看您可以如何做到这一点。
从多个文件中交错行
首先,假设您已经加载了加利福尼亚房屋数据集,对其进行了打乱(除非已经打乱),并将其分为训练集、验证集和测试集。然后将每个集合分成许多 CSV 文件,每个文件看起来像这样(每行包含八个输入特征加上目标中位房价):
MedInc,HouseAge,AveRooms,AveBedrms,Popul…,AveOccup,Lat…,Long…,MedianHouseValue 3.5214,15.0,3.050,1.107,1447.0,1.606,37.63,-122.43,1.442 5.3275,5.0,6.490,0.991,3464.0,3.443,33.69,-117.39,1.687 3.1,29.0,7.542,1.592,1328.0,2.251,38.44,-122.98,1.621 [...]
假设train_filepaths
包含训练文件路径列表(您还有valid_filepaths
和test_filepaths
):
>>> train_filepaths ['datasets/housing/my_train_00.csv', 'datasets/housing/my_train_01.csv', ...]
或者,您可以使用文件模式;例如,train_filepaths =
"datasets/housing/my_train_*.csv"
。现在让我们创建一个仅包含这些文件路径的数据集:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
默认情况下,list_files()
函数返回一个打乱文件路径的数据集。一般来说这是件好事,但是如果出于某种原因不想要这样,可以设置shuffle=False
。
接下来,您可以调用interleave()
方法一次从五个文件中读取并交错它们的行。您还可以使用skip()
方法跳过每个文件的第一行(即标题行):
n_readers = 5 dataset = filepath_dataset.interleave( lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers)
interleave()
方法将创建一个数据集,从filepath_dataset
中提取五个文件路径,对于每个文件路径,它将调用您提供的函数(在本例中是 lambda 函数)来创建一个新的数据集(在本例中是TextLineDataset
)。清楚地说,在这个阶段总共会有七个数据集:文件路径数据集、交错数据集以及交错数据集内部创建的五个TextLineDataset
。当您迭代交错数据集时,它将循环遍历这五个TextLineDataset
,从每个数据集中逐行读取,直到所有数据集都用完。然后它将从filepath_dataset
中获取下一个五个文件路径,并以相同的方式交错它们,依此类推,直到文件路径用完。为了使交错效果最佳,最好拥有相同长度的文件;否则最长文件的末尾将不会被交错。
默认情况下,interleave()
不使用并行处理;它只是顺序地从每个文件中一次读取一行。如果您希望实际并行读取文件,可以将interleave()
方法的num_parallel_calls
参数设置为您想要的线程数(请记住,map()
方法也有这个参数)。甚至可以将其设置为tf.data.AUTOTUNE
,让 TensorFlow 根据可用的 CPU 动态选择正确的线程数。现在让我们看看数据集现在包含什么:
>>> for line in dataset.take(5): ... print(line) ... tf.Tensor(b'4.5909,16.0,[...],33.63,-117.71,2.418', shape=(), dtype=string) tf.Tensor(b'2.4792,24.0,[...],34.18,-118.38,2.0', shape=(), dtype=string) tf.Tensor(b'4.2708,45.0,[...],37.48,-122.19,2.67', shape=(), dtype=string) tf.Tensor(b'2.1856,41.0,[...],32.76,-117.12,1.205', shape=(), dtype=string) tf.Tensor(b'4.1812,52.0,[...],33.73,-118.31,3.215', shape=(), dtype=string)
这些是随机选择的五个 CSV 文件的第一行(忽略标题行)。看起来不错!
注意
可以将文件路径列表传递给 TextLineDataset
构造函数:它将按顺序遍历每个文件的每一行。如果还将 num_parallel_reads
参数设置为大于一的数字,那么数据集将并行读取该数量的文件,并交错它们的行(无需调用 interleave()
方法)。但是,它不会对文件进行洗牌,也不会跳过标题行。
数据预处理
现在我们有一个返回每个实例的住房数据集,其中包含一个字节字符串的张量,我们需要进行一些预处理,包括解析字符串和缩放数据。让我们实现一些自定义函数来执行这些预处理:
X_mean, X_std = [...] # mean and scale of each feature in the training set n_inputs = 8 def parse_csv_line(line): defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)] fields = tf.io.decode_csv(line, record_defaults=defs) return tf.stack(fields[:-1]), tf.stack(fields[-1:]) def preprocess(line): x, y = parse_csv_line(line) return (x - X_mean) / X_std, y
让我们逐步解释这段代码:
- 首先,代码假设我们已经预先计算了训练集中每个特征的均值和标准差。
X_mean
和X_std
只是包含八个浮点数的 1D 张量(或 NumPy 数组),每个输入特征一个。可以使用 Scikit-Learn 的StandardScaler
在数据集的足够大的随机样本上完成这个操作。在本章的后面,我们将使用 Keras 预处理层来代替。 parse_csv_line()
函数接受一个 CSV 行并对其进行解析。为了帮助实现这一点,它使用tf.io.decode_csv()
函数,该函数接受两个参数:第一个是要解析的行,第二个是包含 CSV 文件中每列的默认值的数组。这个数组(defs
)告诉 TensorFlow 不仅每列的默认值是什么,还告诉它列的数量和类型。在这个例子中,我们告诉它所有特征列都是浮点数,缺失值应默认为零,但我们为最后一列(目标)提供了一个空的tf.float32
类型的默认值数组:该数组告诉 TensorFlow 这一列包含浮点数,但没有默认值,因此如果遇到缺失值,它将引发异常。tf.io.decode_csv()
函数返回一个标量张量列表(每列一个),但我们需要返回一个 1D 张量数组。因此,我们对除最后一个(目标)之外的所有张量调用tf.stack()
:这将这些张量堆叠成一个 1D 数组。然后我们对目标值做同样的操作:这将使其成为一个包含单个值的 1D 张量数组,而不是标量张量。tf.io.decode_csv()
函数完成后,它将返回输入特征和目标。- 最后,自定义的
preprocess()
函数只调用parse_csv_line()
函数,通过减去特征均值然后除以特征标准差来缩放输入特征,并返回一个包含缩放特征和目标的元组。
让我们测试这个预处理函数:
>>> preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782') (<tf.Tensor: shape=(8,), dtype=float32, numpy= array([ 0.16579159, 1.216324 , -0.05204564, -0.39215982, -0.5277444 , -0.2633488 , 0.8543046 , -1.3072058 ], dtype=float32)>, <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)
看起来不错!preprocess()
函数可以将一个实例从字节字符串转换为一个漂亮的缩放张量,带有相应的标签。我们现在可以使用数据集的 map()
方法将 preprocess()
函数应用于数据集中的每个样本。
将所有内容放在一起
为了使代码更具重用性,让我们将迄今为止讨论的所有内容放在另一个辅助函数中;它将创建并返回一个数据集,该数据集将高效地从多个 CSV 文件中加载加利福尼亚房屋数据,对其进行预处理、洗牌和分批处理(参见图 13-2):
def csv_reader_dataset(filepaths, n_readers=5, n_read_threads=None, n_parse_threads=5, shuffle_buffer_size=10_000, seed=42, batch_size=32): dataset = tf.data.Dataset.list_files(filepaths, seed=seed) dataset = dataset.interleave( lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers, num_parallel_calls=n_read_threads) dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads) dataset = dataset.shuffle(shuffle_buffer_size, seed=seed) return dataset.batch(batch_size).prefetch(1)
请注意,我们在最后一行使用了 prefetch()
方法。这对性能很重要,你现在会看到。
图 13-2. 从多个 CSV 文件加载和预处理数据
预取
通过在自定义csv_reader_dataset()
函数末尾调用prefetch(1)
,我们正在创建一个数据集,该数据集将尽力始终领先一个批次。换句话说,当我们的训练算法在处理一个批次时,数据集将已经在并行工作,准备好获取下一个批次(例如,从磁盘读取数据并对其进行预处理)。这可以显著提高性能,如图 13-3 所示。
如果我们还确保加载和预处理是多线程的(通过在调用interleave()
和map()
时设置num_parallel_calls
),我们可以利用多个 CPU 核心,希望准备一个数据批次的时间比在 GPU 上运行训练步骤要短:这样 GPU 将几乎 100%利用(除了从 CPU 到 GPU 的数据传输时间)[3],训练将运行得更快。
图 13-3。通过预取,CPU 和 GPU 并行工作:当 GPU 处理一个批次时,CPU 处理下一个批次
提示
如果您计划购买 GPU 卡,其处理能力和内存大小当然非常重要(特别是对于大型计算机视觉或自然语言处理模型,大量的 RAM 至关重要)。对于良好性能同样重要的是 GPU 的内存带宽;这是它每秒可以将多少千兆字节的数据进出其 RAM。
如果数据集足够小,可以放入内存,您可以通过使用数据集的cache()
方法将其内容缓存到 RAM 来显着加快训练速度。通常应在加载和预处理数据之后,但在洗牌、重复、批处理和预取之前执行此操作。这样,每个实例只会被读取和预处理一次(而不是每个时期一次),但数据仍然会在每个时期以不同的方式洗牌,下一批数据仍然会提前准备好。
您现在已经学会了如何构建高效的输入管道,从多个文本文件加载和预处理数据。我们已经讨论了最常见的数据集方法,但还有一些您可能想看看的方法,例如concatenate()
、zip()
、window()
、reduce()
、shard()
、flat_map()
、apply()
、unbatch()
和padded_batch()
。还有一些更多的类方法,例如from_generator()
和from_tensors()
,它们分别从 Python 生成器或张量列表创建新数据集。请查看 API 文档以获取更多详细信息。还请注意,tf.data.experimental
中提供了一些实验性功能,其中许多功能可能会在未来的版本中成为核心 API 的一部分(例如,请查看CsvDataset
类,以及make_csv_dataset()
方法,该方法负责推断每列的类型)。
使用数据集与 Keras
现在,我们可以使用我们之前编写的自定义csv_reader_dataset()
函数为训练集、验证集和测试集创建数据集。训练集将在每个时期进行洗牌(请注意,验证集和测试集也将进行洗牌,尽管我们实际上并不需要):
train_set = csv_reader_dataset(train_filepaths) valid_set = csv_reader_dataset(valid_filepaths) test_set = csv_reader_dataset(test_filepaths)
现在,您可以简单地使用这些数据集构建和训练 Keras 模型。当您调用模型的fit()
方法时,您传递train_set
而不是X_train, y_train
,并传递validation_data=valid_set
而不是validation_data=(X_valid, y_valid)
。fit()
方法将负责每个时期重复训练数据集,每个时期使用不同的随机顺序:
model = tf.keras.Sequential([...]) model.compile(loss="mse", optimizer="sgd") model.fit(train_set, validation_data=valid_set, epochs=5)
同样,您可以将数据集传递给evaluate()
和predict()
方法:
test_mse = model.evaluate(test_set) new_set = test_set.take(3) # pretend we have 3 new samples y_pred = model.predict(new_set) # or you could just pass a NumPy array
与其他数据集不同,new_set
通常不包含标签。如果包含标签,就像这里一样,Keras 会忽略它们。请注意,在所有这些情况下,您仍然可以使用 NumPy 数组而不是数据集(但当然它们需要先加载和预处理)。
如果您想构建自己的自定义训练循环(如第十二章中讨论的),您可以很自然地遍历训练集:
n_epochs = 5 for epoch in range(n_epochs): for X_batch, y_batch in train_set: [...] # perform one gradient descent step
实际上,甚至可以创建一个 TF 函数(参见第十二章),用于整个时期训练模型。这可以真正加快训练速度:
@tf.function def train_one_epoch(model, optimizer, loss_fn, train_set): for X_batch, y_batch in train_set: with tf.GradientTape() as tape: y_pred = model(X_batch) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) loss_fn = tf.keras.losses.mean_squared_error for epoch in range(n_epochs): print("\rEpoch {}/{}".format(epoch + 1, n_epochs), end="") train_one_epoch(model, optimizer, loss_fn, train_set)
在 Keras 中,compile()
方法的steps_per_execution
参数允许您定义fit()
方法在每次调用用于训练的tf.function
时将处理的批次数。默认值只是 1,因此如果将其设置为 50,您通常会看到显着的性能改进。但是,Keras 回调的on_batch_*()
方法只会在每 50 批次时调用一次。
恭喜,您现在知道如何使用 tf.data API 构建强大的输入管道!然而,到目前为止,我们一直在使用常见、简单和方便但不是真正高效的 CSV 文件,并且不太支持大型或复杂的数据结构(如图像或音频)。因此,让我们看看如何改用 TFRecords。
提示
如果您对 CSV 文件(或者您正在使用的其他格式)感到满意,您不一定必须使用 TFRecords。俗话说,如果它没有坏,就不要修理!当训练过程中的瓶颈是加载和解析数据时,TFRecords 非常有用。
TFRecord 格式
TFRecord 格式是 TensorFlow 存储大量数据并高效读取的首选格式。它是一个非常简单的二进制格式,只包含一系列大小不同的二进制记录(每个记录由长度、用于检查长度是否损坏的 CRC 校验和、实际数据,最后是数据的 CRC 校验和组成)。您可以使用tf.io.TFRecordWriter
类轻松创建 TFRecord 文件:
with tf.io.TFRecordWriter("my_data.tfrecord") as f: f.write(b"This is the first record") f.write(b"And this is the second record")
然后,您可以使用tf.data.TFRecordDataset
来读取一个或多个 TFRecord 文件:
filepaths = ["my_data.tfrecord"] dataset = tf.data.TFRecordDataset(filepaths) for item in dataset: print(item)
这将输出:
tf.Tensor(b'This is the first record', shape=(), dtype=string) tf.Tensor(b'And this is the second record', shape=(), dtype=string) • 1 • 2
提示
默认情况下,TFRecordDataset
将逐个读取文件,但您可以使其并行读取多个文件,并通过传递文件路径列表给构造函数并将num_parallel_reads
设置为大于 1 的数字来交错它们的记录。或者,您可以通过使用list_files()
和interleave()
来获得与我们之前读取多个 CSV 文件相同的结果。
压缩的 TFRecord 文件
有时将 TFRecord 文件压缩可能很有用,特别是如果它们需要通过网络连接加载。您可以通过设置options
参数创建一个压缩的 TFRecord 文件:
options = tf.io.TFRecordOptions(compression_type="GZIP") with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f: f.write(b"Compress, compress, compress!")
在读取压缩的 TFRecord 文件时,您需要指定压缩类型:
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"], compression_type="GZIP")
协议缓冲区简介
尽管每个记录可以使用您想要的任何二进制格式,但 TFRecord 文件通常包含序列化的协议缓冲区(也称为protobufs)。这是一个在 2001 年由谷歌开发的便携式、可扩展和高效的二进制格式,并于 2008 年开源;protobufs 现在被广泛使用,特别是在grpc中,谷歌的远程过程调用系统。它们使用一个看起来像这样的简单语言进行定义:
syntax = "proto3"; message Person { string name = 1; int32 id = 2; repeated string email = 3; }
这个 protobuf 定义表示我们正在使用 protobuf 格式的第 3 版,并且指定每个Person
对象(可选)可能具有一个字符串类型的name
、一个 int32 类型的id
,以及零个或多个字符串类型的email
字段。数字1
、2
和3
是字段标识符:它们将在每个记录的二进制表示中使用。一旦你在*.proto文件中有了一个定义,你就可以编译它。这需要使用 protobuf 编译器protoc
在 Python(或其他语言)中生成访问类。请注意,你通常在 TensorFlow 中使用的 protobuf 定义已经为你编译好了,并且它们的 Python 类是 TensorFlow 库的一部分,因此你不需要使用protoc
。你只需要知道如何在 Python 中使用*protobuf 访问类。为了说明基础知识,让我们看一个简单的示例,使用为Person
protobuf 生成的访问类(代码在注释中有解释):
>>> from person_pb2 import Person # import the generated access class >>> person = Person(name="Al", id=123, email=["a@b.com"]) # create a Person >>> print(person) # display the Person name: "Al" id: 123 email: "a@b.com" >>> person.name # read a field 'Al' >>> person.name = "Alice" # modify a field >>> person.email[0] # repeated fields can be accessed like arrays 'a@b.com' >>> person.email.append("c@d.com") # add an email address >>> serialized = person.SerializeToString() # serialize person to a byte string >>> serialized b'\n\x05Alice\x10{\x1a\x07a@b.com\x1a\x07c@d.com' >>> person2 = Person() # create a new Person >>> person2.ParseFromString(serialized) # parse the byte string (27 bytes long) 27 >>> person == person2 # now they are equal True
简而言之,我们导入由protoc
生成的Person
类,创建一个实例并对其进行操作,可视化它并读取和写入一些字段,然后使用SerializeToString()
方法对其进行序列化。这是准备保存或通过网络传输的二进制数据。当读取或接收这些二进制数据时,我们可以使用ParseFromString()
方法进行解析,并获得被序列化的对象的副本。
你可以将序列化的Person
对象保存到 TFRecord 文件中,然后加载和解析它:一切都会正常工作。然而,ParseFromString()
不是一个 TensorFlow 操作,所以你不能在 tf.data 管道中的预处理函数中使用它(除非将其包装在tf.py_function()
操作中,这会使代码变慢且不太可移植,正如你在第十二章中看到的)。然而,你可以使用tf.io.decode_proto()
函数,它可以解析任何你想要的 protobuf,只要你提供 protobuf 定义(请参考笔记本中的示例)。也就是说,在实践中,你通常会希望使用 TensorFlow 提供的专用解析操作的预定义 protobuf。现在让我们来看看这些预定义的 protobuf。
TensorFlow Protobufs
TFRecord 文件中通常使用的主要 protobuf 是Example
protobuf,它表示数据集中的一个实例。它包含一个命名特征列表,其中每个特征可以是一个字节字符串列表、一个浮点数列表或一个整数列表。以下是 protobuf 定义(来自 TensorFlow 源代码):
syntax = "proto3"; message BytesList { repeated bytes value = 1; } message FloatList { repeated float value = 1 [packed = true]; } message Int64List { repeated int64 value = 1 [packed = true]; } message Feature { oneof kind { BytesList bytes_list = 1; FloatList float_list = 2; Int64List int64_list = 3; } }; message Features { map<string, Feature> feature = 1; }; message Example { Features features = 1; };
BytesList
、FloatList
和Int64List
的定义足够简单明了。请注意,对于重复的数值字段,使用[packed = true]
进行更有效的编码。Feature
包含一个BytesList
、一个FloatList
或一个Int64List
。一个Features
(带有s
)包含一个将特征名称映射到相应特征值的字典。最后,一个Example
只包含一个Features
对象。
注意
为什么会定义Example
,因为它只包含一个Features
对象?嗯,TensorFlow 的开发人员可能有一天决定向其中添加更多字段。只要新的Example
定义仍然包含相同 ID 的features
字段,它就是向后兼容的。这种可扩展性是 protobuf 的一个伟大特性。
这是你如何创建一个代表同一个人的tf.train.Example
:
from tensorflow.train import BytesList, FloatList, Int64List from tensorflow.train import Feature, Features, Example person_example = Example( features=Features( feature={ "name": Feature(bytes_list=BytesList(value=[b"Alice"])), "id": Feature(int64_list=Int64List(value=[123])), "emails": Feature(bytes_list=BytesList(value=[b"a@b.com", b"c@d.com"])) }))
这段代码有点冗长和重复,但你可以很容易地将其包装在一个小的辅助函数中。现在我们有了一个Example
protobuf,我们可以通过调用其SerializeToString()
方法将其序列化,然后将生成的数据写入 TFRecord 文件。让我们假装写入五次,以假装我们有几个联系人:
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f: for _ in range(5): f.write(person_example.SerializeToString())
通常,您会写比五个Example
更多的内容!通常情况下,您会创建一个转换脚本,从当前格式(比如 CSV 文件)读取数据,为每个实例创建一个Example
protobuf,将它们序列化,并保存到几个 TFRecord 文件中,最好在此过程中对它们进行洗牌。这需要一些工作,所以再次确保这确实是必要的(也许您的流水线使用 CSV 文件运行良好)。
现在我们有一个包含多个序列化Example
的漂亮 TFRecord 文件,让我们尝试加载它。
加载和解析示例
为了加载序列化的Example
protobufs,我们将再次使用tf.data.TFRecordDataset
,并使用tf.io.parse_single_example()
解析每个Example
。它至少需要两个参数:包含序列化数据的字符串标量张量,以及每个特征的描述。描述是一个字典,将每个特征名称映射到tf.io.FixedLenFeature
描述符,指示特征的形状、类型和默认值,或者tf.io.VarLenFeature
描述符,仅指示特征列表的长度可能变化的类型(例如"emails"
特征)。
以下代码定义了一个描述字典,然后创建了一个TFRecordDataset
,并对其应用了一个自定义预处理函数,以解析该数据集包含的每个序列化Example
protobuf:
feature_description = { "name": tf.io.FixedLenFeature([], tf.string, default_value=""), "id": tf.io.FixedLenFeature([], tf.int64, default_value=0), "emails": tf.io.VarLenFeature(tf.string), } def parse(serialized_example): return tf.io.parse_single_example(serialized_example, feature_description) dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).map(parse) for parsed_example in dataset: print(parsed_example)
固定长度的特征被解析为常规张量,但变长特征被解析为稀疏张量。您可以使用tf.sparse.to_dense()
将稀疏张量转换为密集张量,但在这种情况下,更简单的方法是直接访问其值:
>>> tf.sparse.to_dense(parsed_example["emails"], default_value=b"") <tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])> >>> parsed_example["emails"].values <tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
您可以使用tf.io.parse_example()
批量解析示例,而不是使用tf.io.parse_single_example()
逐个解析它们:
def parse(serialized_examples): return tf.io.parse_example(serialized_examples, feature_description) dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(2).map(parse) for parsed_examples in dataset: print(parsed_examples) # two examples at a time
最后,BytesList
可以包含您想要的任何二进制数据,包括任何序列化对象。例如,您可以使用tf.io.encode_jpeg()
使用 JPEG 格式对图像进行编码,并将这些二进制数据放入BytesList
中。稍后,当您的代码读取 TFRecord 时,它将从解析Example
开始,然后需要调用tf.io.decode_jpeg()
来解析数据并获取原始图像(或者您可以使用tf.io.decode_image()
,它可以解码任何 BMP、GIF、JPEG 或 PNG 图像)。您还可以通过使用tf.io.serialize_tensor()
对张量进行序列化,然后将生成的字节字符串放入BytesList
特征中,将任何您想要的张量存储在BytesList
中。稍后,当您解析 TFRecord 时,您可以使用tf.io.parse_tensor()
解析这些数据。请参阅本章的笔记本https://homl.info/colab3 ,了解在 TFRecord 文件中存储图像和张量的示例。
正如您所看到的,Example
protobuf 非常灵活,因此对于大多数用例来说可能已经足够了。但是,当您处理列表列表时,可能会有些繁琐。例如,假设您想对文本文档进行分类。每个文档可以表示为一个句子列表,其中每个句子表示为一个单词列表。也许每个文档还有一个评论列表,其中每个评论表示为一个单词列表。还可能有一些上下文数据,比如文档的作者、标题和发布日期。TensorFlow 的SequenceExample
protobuf 就是为这种用例而设计的。
Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(五)(4)https://developer.aliyun.com/article/1482437