Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(五)(1)https://developer.aliyun.com/article/1482430
自定义层
有时候你可能想要构建一个包含一种 TensorFlow 没有提供默认实现的奇特层的架构。或者你可能只是想要构建一个非常重复的架构,在这种架构中,一个特定的层块被重复多次,将每个块视为单个层会很方便。对于这些情况,你会想要构建一个自定义层。
有一些没有权重的层,比如tf.keras.layers.Flatten
或tf.keras.layers.ReLU
。如果你想创建一个没有任何权重的自定义层,最简单的方法是编写一个函数并将其包装在tf.keras.layers.Lambda
层中。例如,以下层将对其输入应用指数函数:
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))
然后,这个自定义层可以像任何其他层一样使用,使用序贯 API、函数式 API 或子类 API。你也可以将它用作激活函数,或者你可以使用activation=tf.exp
。指数层有时用于回归模型的输出层,当要预测的值具有非常不同的规模时(例如,0.001、10.、1,000.)。事实上,指数函数是 Keras 中的标准激活函数之一,所以你可以简单地使用activation="exponential"
。
你可能会猜到,要构建一个自定义的有状态层(即带有权重的层),你需要创建tf.keras.layers.Layer
类的子类。例如,以下类实现了Dense
层的简化版本:
class MyDense(tf.keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = tf.keras.activations.get(activation) def build(self, batch_input_shape): self.kernel = self.add_weight( name="kernel", shape=[batch_input_shape[-1], self.units], initializer="glorot_normal") self.bias = self.add_weight( name="bias", shape=[self.units], initializer="zeros") def call(self, X): return self.activation(X @ self.kernel + self.bias) def get_config(self): base_config = super().get_config() return {**base_config, "units": self.units, "activation": tf.keras.activations.serialize(self.activation)}
让我们来看看这段代码:
- 构造函数将所有超参数作为参数(在这个例子中是
units
和activation
),并且重要的是它还接受一个**kwargs
参数。它调用父构造函数,将kwargs
传递给它:这会处理标准参数,如input_shape
、trainable
和name
。然后它将超参数保存为属性,使用tf.keras.activations.get()
函数将activation
参数转换为适当的激活函数(它接受函数、标准字符串如"relu"
或"swish"
,或者简单地None
)。 build()
方法的作用是通过为每个权重调用add_weight()
方法来创建层的变量。build()
方法在第一次使用该层时被调用。在那时,Keras 将知道该层输入的形状,并将其传递给build()
方法,这通常是创建一些权重所必需的。例如,我们需要知道前一层中的神经元数量以创建连接权重矩阵(即"kernel"
):这对应于输入的最后一个维度的大小。在build()
方法的最后(仅在最后),您必须调用父类的build()
方法:这告诉 Keras 该层已构建(它只是设置self.built = True
)。call()
方法执行所需的操作。在这种情况下,我们计算输入X
和层的内核的矩阵乘法,添加偏置向量,并将激活函数应用于结果,这给出了层的输出。get_config()
方法与以前的自定义类中的方法一样。请注意,通过调用tf.keras.activations.serialize()
保存激活函数的完整配置。
现在您可以像使用任何其他层一样使用MyDense
层!
注意
Keras 会自动推断输出形状,除非该层是动态的(稍后将看到)。在这种(罕见)情况下,您需要实现compute_output_shape()
方法,该方法必须返回一个TensorShape
对象。
要创建具有多个输入的层(例如,Concatenate
),call()
方法的参数应该是一个包含所有输入的元组。要创建具有多个输出的层,call()
方法应该返回输出的列表。例如,以下示例玩具层接受两个输入并返回三个输出:
class MyMultiLayer(tf.keras.layers.Layer): def call(self, X): X1, X2 = X return X1 + X2, X1 * X2, X1 / X2
这个层现在可以像任何其他层一样使用,但当然只能使用功能 API 和子类 API,而不能使用顺序 API(顺序 API 只接受具有一个输入和一个输出的层)。
如果您的层在训练和测试期间需要具有不同的行为(例如,如果它使用Dropout
或BatchNormalization
层),那么您必须在call()
方法中添加一个training
参数,并使用此参数来决定要执行什么操作。例如,让我们创建一个在训练期间添加高斯噪声(用于正则化)但在测试期间不执行任何操作的层(Keras 有一个执行相同操作的层,tf.keras.layers.GaussianNoise
):
class MyGaussianNoise(tf.keras.layers.Layer): def __init__(self, stddev, **kwargs): super().__init__(**kwargs) self.stddev = stddev def call(self, X, training=False): if training: noise = tf.random.normal(tf.shape(X), stddev=self.stddev) return X + noise else: return X
有了这个,您现在可以构建任何您需要的自定义层!现在让我们看看如何创建自定义模型。
自定义模型
我们已经在第十章中讨论了使用子类 API 创建自定义模型类。这很简单:子类化tf.keras.Model
类,在构造函数中创建层和变量,并实现call()
方法以执行您希望模型执行的操作。例如,假设我们想要构建图 12-3 中表示的模型。
图 12-3。自定义模型示例:一个包含跳过连接的自定义ResidualBlock
层的任意模型
输入首先经过一个密集层,然后通过由两个密集层和一个加法操作组成的残差块(如您将在第十四章中看到的,残差块将其输入添加到其输出中),然后通过这个相同的残差块再进行三次,然后通过第二个残差块,最终结果通过一个密集输出层。如果这个模型看起来没有太多意义,不要担心;这只是一个示例,说明您可以轻松构建任何您想要的模型,甚至包含循环和跳过连接的模型。要实现这个模型,最好首先创建一个ResidualBlock
层,因为我们将创建一对相同的块(并且可能希望在另一个模型中重用它):
class ResidualBlock(tf.keras.layers.Layer): def __init__(self, n_layers, n_neurons, **kwargs): super().__init__(**kwargs) self.hidden = [tf.keras.layers.Dense(n_neurons, activation="relu", kernel_initializer="he_normal") for _ in range(n_layers)] def call(self, inputs): Z = inputs for layer in self.hidden: Z = layer(Z) return inputs + Z
这个层有点特殊,因为它包含其他层。Keras 会自动处理这一点:它会自动检测hidden
属性包含可跟踪对象(在这种情况下是层),因此它们的变量会自动添加到此层的变量列表中。这个类的其余部分是不言自明的。接下来,让我们使用子类 API 来定义模型本身:
class ResidualRegressor(tf.keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden1 = tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal") self.block1 = ResidualBlock(2, 30) self.block2 = ResidualBlock(2, 30) self.out = tf.keras.layers.Dense(output_dim) def call(self, inputs): Z = self.hidden1(inputs) for _ in range(1 + 3): Z = self.block1(Z) Z = self.block2(Z) return self.out(Z)
我们在构造函数中创建层,并在call()
方法中使用它们。然后可以像任何其他模型一样使用此模型(编译、拟合、评估和使用它进行预测)。如果您还希望能够使用save()
方法保存模型,并使用tf.keras.models.load_model()
函数加载模型,则必须在ResidualBlock
类和ResidualRegressor
类中实现get_config()
方法(就像我们之前做的那样)。或者,您可以使用save_weights()
和load_weights()
方法保存和加载权重。
Model
类是Layer
类的子类,因此模型可以像层一样定义和使用。但是模型具有一些额外的功能,包括当然包括compile()
、fit()
、evaluate()
和predict()
方法(以及一些变体),还有get_layer()
方法(可以通过名称或索引返回模型的任何层)和save()
方法(以及对tf.keras.models.load_model()
和tf.keras.models.clone_model()
的支持)。
提示
如果模型提供的功能比层更多,为什么不将每个层都定义为模型呢?技术上您可以这样做,但通常更清晰的做法是区分模型的内部组件(即层或可重用的层块)和模型本身(即您将训练的对象)。前者应该是Layer
类的子类,而后者应该是Model
类的子类。
有了这些,您可以自然而简洁地构建几乎任何您在论文中找到的模型,使用顺序 API、函数 API、子类 API,甚至这些的混合。“几乎”任何模型?是的,还有一些事情我们需要看一下:首先是如何基于模型内部定义损失或指标,其次是如何构建自定义训练循环。
基于模型内部的损失和指标
我们之前定义的自定义损失和指标都是基于标签和预测(以及可选的样本权重)。有时您可能希望基于模型的其他部分(例如其隐藏层的权重或激活)定义损失。这可能对正则化目的或监视模型的某些内部方面很有用。
要基于模型内部定义自定义损失,可以根据模型的任何部分计算损失,然后将结果传递给add_loss()
方法。例如,让我们构建一个由五个隐藏层堆叠加一个输出层组成的自定义回归 MLP 模型。这个自定义模型还将在最上面的隐藏层之上具有一个辅助输出。与这个辅助输出相关联的损失将被称为重建损失(参见第十七章):它是重建和输入之间的均方差差异。通过将这个重建损失添加到主要损失中,我们将鼓励模型通过隐藏层尽可能保留更多信息,即使这些信息对于回归任务本身并不直接有用。在实践中,这种损失有时会改善泛化能力(它是一种正则化损失)。还可以使用模型的add_metric()
方法添加自定义指标。以下是具有自定义重建损失和相应指标的自定义模型的代码:
class ReconstructingRegressor(tf.keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden = [tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal") for _ in range(5)] self.out = tf.keras.layers.Dense(output_dim) self.reconstruction_mean = tf.keras.metrics.Mean( name="reconstruction_error") def build(self, batch_input_shape): n_inputs = batch_input_shape[-1] self.reconstruct = tf.keras.layers.Dense(n_inputs) def call(self, inputs, training=False): Z = inputs for layer in self.hidden: Z = layer(Z) reconstruction = self.reconstruct(Z) recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs)) self.add_loss(0.05 * recon_loss) if training: result = self.reconstruction_mean(recon_loss) self.add_metric(result) return self.out(Z)
让我们来看一下这段代码:
- 构造函数创建了一个具有五个密集隐藏层和一个密集输出层的 DNN。我们还创建了一个
Mean
流式指标,用于在训练过程中跟踪重建误差。 build()
方法创建一个额外的密集层,用于重构模型的输入。它必须在这里创建,因为其单元数必须等于输入的数量,在调用build()
方法之前这个数量是未知的。call()
方法通过所有五个隐藏层处理输入,然后将结果传递给重构层,该层生成重构。- 然后
call()
方法计算重构损失(重构和输入之间的均方差),并使用add_loss()
方法将其添加到模型的损失列表中。请注意,我们通过将重构损失乘以 0.05 来缩小重构损失(这是一个可以调整的超参数)。这确保了重构损失不会主导主要损失。 - 接下来,在训练过程中,
call()
方法更新重构度量并将其添加到模型中以便显示。这段代码示例实际上可以通过调用self.add_metric(recon_loss)
来简化:Keras 将自动为您跟踪均值。 - 最后,
call()
方法将隐藏层的输出传递给输出层,并返回其输出。
在训练过程中,总损失和重构损失都会下降:
Epoch 1/5 363/363 [========] - 1s 820us/step - loss: 0.7640 - reconstruction_error: 1.2728 Epoch 2/5 363/363 [========] - 0s 809us/step - loss: 0.4584 - reconstruction_error: 0.6340 [...]
在大多数情况下,到目前为止我们讨论的一切将足以实现您想构建的任何模型,即使是具有复杂架构、损失和指标。然而,对于一些架构,如 GANs(参见第十七章),您将不得不自定义训练循环本身。在我们到达那里之前,我们必须看看如何在 TensorFlow 中自动计算梯度。
使用自动微分计算梯度
要了解如何使用自动微分(参见第十章和附录 B)自动计算梯度,让我们考虑一个简单的玩具函数:
def f(w1, w2): return 3 * w1 ** 2 + 2 * w1 * w2
如果你懂微积分,你可以分析地找到这个函数相对于w1
的偏导数是6 * w1
+
2 * w2
。你也可以找到它相对于w2
的偏导数是2 * w1
。例如,在点(w1, w2)
=
(5, 3)
,这些偏导数分别等于 36 和 10,因此在这一点的梯度向量是(36,10)。但如果这是一个神经网络,这个函数会复杂得多,通常有数万个参数,通过手工分析找到偏导数将是一个几乎不可能的任务。一个解决方案是通过测量当你微调相应参数一点点时函数的输出如何变化来计算每个偏导数的近似值:
>>> w1, w2 = 5, 3 >>> eps = 1e-6 >>> (f(w1 + eps, w2) - f(w1, w2)) / eps 36.000003007075065 >>> (f(w1, w2 + eps) - f(w1, w2)) / eps 10.000000003174137
看起来不错!这个方法运行得相当好,而且易于实现,但它只是一个近似值,重要的是你需要至少针对每个参数调用一次f()
(不是两次,因为我们可以只计算一次f(w1, w2)
)。每个参数至少调用一次f()
使得这种方法在大型神经网络中变得难以处理。因此,我们应该使用反向模式自动微分。TensorFlow 使这变得非常简单:
w1, w2 = tf.Variable(5.), tf.Variable(3.) with tf.GradientTape() as tape: z = f(w1, w2) gradients = tape.gradient(z, [w1, w2])
首先我们定义两个变量w1
和w2
,然后我们创建一个tf.GradientTape
上下文,它将自动记录涉及变量的每个操作,最后我们要求这个磁带计算结果z
相对于两个变量[w1, w2]
的梯度。让我们看看 TensorFlow 计算的梯度:
>>> gradients [<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
太棒了!结果不仅准确(精度仅受浮点误差限制),而且gradient()
方法只需通过记录的计算一次(按相反顺序),无论有多少变量,因此非常高效。就像魔术一样!
提示
为了节省内存,在tf.GradientTape()
块中只放入严格的最小值。或者,通过在tf.GradientTape()
块内创建一个with tape.stop_recording()
块来暂停记录。
在调用其gradient()
方法后,磁带会立即被擦除,因此如果尝试两次调用gradient()
,将会收到异常:
with tf.GradientTape() as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) # returns tensor 36.0 dz_dw2 = tape.gradient(z, w2) # raises a RuntimeError!
如果您需要多次调用gradient()
,您必须使磁带持久化,并在每次完成后删除它以释放资源:
with tf.GradientTape(persistent=True) as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) # returns tensor 36.0 dz_dw2 = tape.gradient(z, w2) # returns tensor 10.0, works fine now! del tape
默认情况下,磁带只会跟踪涉及变量的操作,因此,如果您尝试计算z
相对于除变量以外的任何东西的梯度,结果将是None
:
c1, c2 = tf.constant(5.), tf.constant(3.) with tf.GradientTape() as tape: z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # returns [None, None]
但是,您可以强制磁带监视任何您喜欢的张量,记录涉及它们的每个操作。然后,您可以计算相对于这些张量的梯度,就像它们是变量一样:
with tf.GradientTape() as tape: tape.watch(c1) tape.watch(c2) z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # returns [tensor 36., tensor 10.]
在某些情况下,这可能很有用,比如如果您想要实现一个正则化损失,惩罚激活在输入变化很小时变化很大的情况:损失将基于激活相对于输入的梯度。由于输入不是变量,您需要告诉磁带监视它们。
大多数情况下,梯度磁带用于计算单个值(通常是损失)相对于一组值(通常是模型参数)的梯度。这就是反向模式自动微分的优势所在,因为它只需要进行一次前向传递和一次反向传递就可以一次性获得所有梯度。如果尝试计算向量的梯度,例如包含多个损失的向量,那么 TensorFlow 将计算向量总和的梯度。因此,如果您需要获取各个梯度(例如,每个损失相对于模型参数的梯度),您必须调用磁带的jacobian()
方法:它将为向量中的每个损失执行一次反向模式自动微分(默认情况下全部并行)。甚至可以计算二阶偏导数(Hessians,即偏导数的偏导数),但在实践中很少需要(请参阅本章笔记本的“使用自动微分计算梯度”部分以获取示例)。
在某些情况下,您可能希望阻止梯度通过神经网络的某些部分进行反向传播。为此,您必须使用tf.stop_gradient()
函数。该函数在前向传递期间返回其输入(类似于tf.identity()
),但在反向传播期间不允许梯度通过(它的作用类似于常数):
def f(w1, w2): return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2) with tf.GradientTape() as tape: z = f(w1, w2) # the forward pass is not affected by stop_gradient() gradients = tape.gradient(z, [w1, w2]) # returns [tensor 30., None]
最后,当计算梯度时,您可能偶尔会遇到一些数值问题。例如,如果在x=10^(-50)处计算平方根函数的梯度,结果将是无穷大。实际上,该点的斜率并不是无穷大,但它超过了 32 位浮点数的处理能力:
>>> x = tf.Variable(1e-50) >>> with tf.GradientTape() as tape: ... z = tf.sqrt(x) ... >>> tape.gradient(z, [x]) [<tf.Tensor: shape=(), dtype=float32, numpy=inf>]
为了解决这个问题,在计算平方根时,通常建议向x(例如 10^(-6))添加一个微小值。
指数函数也经常引起头痛,因为它增长非常快。例如,之前定义的my_softplus()
的方式在数值上不稳定。如果计算my_softplus(100.0)
,您将得到无穷大而不是正确的结果(约为 100)。但是可以重写该函数以使其在数值上稳定:softplus 函数被定义为 log(1 + exp(z)),这也等于 log(1 + exp(–|z|)) + max(z, 0)(请参阅数学证明的笔记本),第二种形式的优势在于指数项不会爆炸。因此,这是my_softplus()
函数的更好实现:
def my_softplus(z): return tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)
在一些罕见的情况下,一个数值稳定的函数可能仍然具有数值不稳定的梯度。在这种情况下,你将不得不告诉 TensorFlow 使用哪个方程来计算梯度,而不是让它使用自动微分。为此,你必须在定义函数时使用@tf.custom_gradient
装饰器,并返回函数的通常结果以及计算梯度的函数。例如,让我们更新my_softplus()
函数,使其也返回一个数值稳定的梯度函数:
@tf.custom_gradient def my_softplus(z): def my_softplus_gradients(grads): # grads = backprop'ed from upper layers return grads * (1 - 1 / (1 + tf.exp(z))) # stable grads of softplus result = tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z) return result, my_softplus_gradients
如果你懂微积分(参见关于这个主题的教程笔记本),你会发现 log(1 + exp(z))的导数是 exp(z) / (1 + exp(z))。但这种形式是不稳定的:对于较大的z值,它最终会计算出无穷大除以无穷大,返回 NaN。然而,通过一点代数操作,你可以证明它也等于 1 - 1 / (1 + exp(z)),这是稳定的。my_softplus_gradients()
函数使用这个方程来计算梯度。请注意,这个函数将接收到目前为止反向传播的梯度,一直到my_softplus()
函数,并根据链式法则,我们必须将它们与这个函数的梯度相乘。
现在当我们计算my_softplus()
函数的梯度时,即使对于较大的输入值,我们也会得到正确的结果。
恭喜!现在你可以计算任何函数的梯度(只要在计算时它是可微的),甚至在需要时阻止反向传播,并编写自己的梯度函数!这可能比你需要的灵活性更多,即使你构建自己的自定义训练循环。接下来你将看到如何做到这一点。
自定义训练循环
在某些情况下,fit()
方法可能不够灵活以满足你的需求。例如,我们在第十章中讨论的Wide & Deep 论文使用了两种不同的优化器:一种用于宽路径,另一种用于深路径。由于fit()
方法只使用一个优化器(在编译模型时指定的那个),实现这篇论文需要编写自己的自定义循环。
你可能也喜欢编写自定义训练循环,只是为了更有信心地确保它们确实按照你的意图执行(也许你对fit()
方法的一些细节不确定)。有时候,让一切都显式化可能会感觉更安全。然而,请记住,编写自定义训练循环会使你的代码变得更长、更容易出错,并且更难维护。
提示
除非你在学习或确实需要额外的灵活性,否则应该优先使用fit()
方法而不是实现自己的训练循环,特别是如果你在团队中工作。
首先,让我们构建一个简单的模型。不需要编译它,因为我们将手动处理训练循环:
l2_reg = tf.keras.regularizers.l2(0.05) model = tf.keras.models.Sequential([ tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal", kernel_regularizer=l2_reg), tf.keras.layers.Dense(1, kernel_regularizer=l2_reg) ])
接下来,让我们创建一个小函数,从训练集中随机抽取一个批次的实例(在第十三章中,我们将讨论 tf.data API,它提供了一个更好的替代方案):
def random_batch(X, y, batch_size=32): idx = np.random.randint(len(X), size=batch_size) return X[idx], y[idx]
让我们还定义一个函数,用于显示训练状态,包括步数、总步数、自开始时的平均损失(我们将使用Mean
指标来计算),以及其他指标:
def print_status_bar(step, total, loss, metrics=None): metrics = " - ".join([f"{m.name}: {m.result():.4f}" for m in [loss] + (metrics or [])]) end = "" if step < total else "\n" print(f"\r{step}/{total} - " + metrics, end=end)
这段代码很容易理解,除非你不熟悉 Python 的字符串格式化:{m.result():.4f}
将指标的结果格式化为小数点后四位的浮点数,使用\r
(回车)和end=""
确保状态栏始终打印在同一行上。
有了这个,让我们开始吧!首先,我们需要定义一些超参数,并选择优化器、损失函数和指标(在这个例子中只有 MAE):
n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = tf.keras.optimizers.SGD(learning_rate=0.01) loss_fn = tf.keras.losses.mean_squared_error mean_loss = tf.keras.metrics.Mean(name="mean_loss") metrics = [tf.keras.metrics.MeanAbsoluteError()]
现在我们准备构建自定义循环了!
for epoch in range(1, n_epochs + 1): print("Epoch {}/{}".format(epoch, n_epochs)) for step in range(1, n_steps + 1): X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch, training=True) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) mean_loss(loss) for metric in metrics: metric(y_batch, y_pred) print_status_bar(step, n_steps, mean_loss, metrics) for metric in [mean_loss] + metrics: metric.reset_states()
这段代码中有很多内容,让我们来逐步解释一下:
- 我们创建两个嵌套循环:一个用于时期,另一个用于时期内的批次。
- 然后我们从训练集中抽取一个随机批次。
- 在
tf.GradientTape()
块内,我们对一个批次进行预测,使用模型作为一个函数,并计算损失:它等于主要损失加上其他损失(在这个模型中,每层有一个正则化损失)。由于mean_squared_error()
函数返回每个实例的一个损失,我们使用tf.reduce_mean()
计算批次的平均值(如果您想对每个实例应用不同的权重,这就是您应该做的地方)。正则化损失已经被减少为每个单一标量,所以我们只需要对它们求和(使用tf.add_n()
,它对相同形状和数据类型的多个张量求和)。 - 接下来,我们要求磁带计算损失相对于每个可训练变量的梯度——不是所有变量!——并将它们应用于优化器以执行梯度下降步骤。
- 然后我们更新平均损失和指标(在当前时期内),并显示状态栏。
- 在每个时期结束时,我们重置平均损失和指标的状态。
如果您想应用梯度裁剪(参见第十一章),请设置优化器的 clipnorm
或 clipvalue
超参数。如果您想对梯度应用任何其他转换,只需在调用 apply_gradients()
方法之前这样做。如果您想向模型添加权重约束(例如,在创建层时设置 kernel_constraint
或 bias_constraint
),您应该更新训练循环以在 apply_gradients()
之后应用这些约束,就像这样:
for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable))
警告
在训练循环中调用模型时不要忘记设置 training=True
,特别是如果您的模型在训练和测试期间表现不同(例如,如果它使用 BatchNormalization
或 Dropout
)。如果是自定义模型,请确保将 training
参数传播到您的模型调用的层。
正如您所看到的,有很多事情需要做对,很容易出错。但好的一面是,您可以完全控制。
现在您知道如何自定义模型的任何部分¹⁵和训练算法,让我们看看如何使用 TensorFlow 的自动生成图形功能:它可以显著加快您的自定义代码,并且还将其移植到 TensorFlow 支持的任何平台(参见第十九章)。
TensorFlow 函数和图形
回到 TensorFlow 1,图形是不可避免的(伴随着复杂性),因为它们是 TensorFlow API 的核心部分。自从 TensorFlow 2(2019 年发布)以来,图形仍然存在,但不再是核心部分,而且使用起来简单得多(多得多!)。为了展示它们有多简单,让我们从一个计算其输入的立方的微不足道的函数开始:
def cube(x): return x ** 3
我们显然可以使用 Python 值(如整数或浮点数)调用此函数,或者我们可以使用张量调用它:
>>> cube(2) 8 >>> cube(tf.constant(2.0)) <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
现在,让我们使用 tf.function()
将这个 Python 函数转换为 TensorFlow 函数:
>>> tf_cube = tf.function(cube) >>> tf_cube <tensorflow.python.eager.def_function.Function at 0x7fbfe0c54d50>
然后,这个 TF 函数可以像原始的 Python 函数一样使用,并且将返回相同的结果(但始终作为张量):
>>> tf_cube(2) <tf.Tensor: shape=(), dtype=int32, numpy=8> >>> tf_cube(tf.constant(2.0)) <tf.Tensor: shape=(), dtype=float32, numpy=8.0>
在幕后,tf.function()
分析了 cube()
函数执行的计算,并生成了一个等效的计算图!正如您所看到的,这是相当轻松的(我们很快会看到这是如何工作的)。或者,我们也可以将 tf.function
用作装饰器;这实际上更常见:
@tf.function def tf_cube(x): return x ** 3
原始的 Python 函数仍然可以通过 TF 函数的 python_function
属性访问,以防您需要它:
>>> tf_cube.python_function(2) 8
TensorFlow 优化计算图,修剪未使用的节点,简化表达式(例如,1 + 2 将被替换为 3)等。一旦优化的图准备就绪,TF 函数将有效地执行图中的操作,按适当的顺序(并在可能时并行执行)。因此,TF 函数通常比原始 Python 函数运行得快得多,特别是如果它执行复杂计算。大多数情况下,您实际上不需要知道更多:当您想要提升 Python 函数时,只需将其转换为 TF 函数。就这样!
此外,如果在调用tf.function()
时设置jit_compile=True
,那么 TensorFlow 将使用加速线性代数(XLA)为您的图编译专用内核,通常融合多个操作。例如,如果您的 TF 函数调用tf.reduce_sum(a * b + c)
,那么没有 XLA,函数首先需要计算a * b
并将结果存储在临时变量中,然后将c
添加到该变量中,最后调用tf.reduce_sum()
处理结果。使用 XLA,整个计算将编译为单个内核,该内核将一次性计算tf.reduce_sum(a * b + c)
,而不使用任何大型临时变量。这不仅速度更快,而且使用的 RAM 大大减少。
当您编写自定义损失函数、自定义指标、自定义层或任何其他自定义函数,并在 Keras 模型中使用它(就像我们在本章中一直做的那样),Keras 会自动将您的函数转换为 TF 函数——无需使用tf.function()
。因此,大多数情况下,这种魔术是 100%透明的。如果您希望 Keras 使用 XLA,只需在调用compile()
方法时设置jit_compile=True
。简单!
提示
您可以通过在创建自定义层或自定义模型时设置dynamic=True
来告诉 Keras不将您的 Python 函数转换为 TF 函数。或者,您可以在调用模型的compile()
方法时设置run_eagerly=True
。
默认情况下,TF 函数为每个唯一的输入形状和数据类型生成一个新图,并将其缓存以供后续调用。例如,如果您调用tf_cube(tf.constant(10))
,将为形状为[]的 int32 张量生成一个图。然后,如果您调用tf_cube(tf.constant(20))
,将重用相同的图。但是,如果您随后调用tf_cube(tf.constant([10, 20]))
,将为形状为[2]的 int32 张量生成一个新图。这就是 TF 函数处理多态性(即不同的参数类型和形状)的方式。但是,这仅适用于张量参数:如果将数值 Python 值传递给 TF 函数,则将为每个不同的值生成一个新图:例如,调用tf_cube(10)
和tf_cube(20)
将生成两个图。
警告
如果您多次使用不同的数值 Python 值调用 TF 函数,则将生成许多图,减慢程序速度并使用大量 RAM(您必须删除 TF 函数才能释放它)。Python 值应保留用于将具有少量唯一值的参数,例如每层神经元的数量之类的超参数。这样可以使 TensorFlow 更好地优化模型的每个变体。
AutoGraph 和跟踪
那么 TensorFlow 如何生成图呢?它首先通过分析 Python 函数的源代码来捕获所有控制流语句,比如for
循环、while
循环和if
语句,以及break
、continue
和return
语句。这第一步被称为AutoGraph。TensorFlow 必须分析源代码的原因是 Python 没有提供其他捕获控制流语句的方法:它提供了像__add__()
和__mul__()
这样的魔术方法来捕获+
和*
等运算符,但没有__while__()
或__if__()
这样的魔术方法。在分析函数代码之后,AutoGraph 会输出一个升级版本的函数,其中所有控制流语句都被适当的 TensorFlow 操作替换,比如tf.while_loop()
用于循环,tf.cond()
用于if
语句。例如,在图 12-4 中,AutoGraph 分析了sum_squares()
Python 函数的源代码,并生成了tf__sum_squares()
函数。在这个函数中,for
循环被替换为loop_body()
函数的定义(包含原始for
循环的主体),然后调用for_stmt()
函数。这个调用将在计算图中构建适当的tf.while_loop()
操作。
图 12-4. TensorFlow 如何使用 AutoGraph 和跟踪生成图
接下来,TensorFlow 调用这个“升级”函数,但不是传递参数,而是传递一个符号张量—一个没有实际值的张量,只有一个名称、一个数据类型和一个形状。例如,如果您调用sum_squares(tf.constant(10))
,那么tf__sum_squares()
函数将被调用,传递一个类型为 int32、形状为[]的符号张量。该函数将在图模式下运行,这意味着每个 TensorFlow 操作都会在图中添加一个节点来表示自己和其输出张量(与常规模式相反,称为急切执行或急切模式)。在图模式下,TF 操作不执行任何计算。图模式是 TensorFlow 1 中的默认模式。在图 12-4 中,您可以看到tf__sum_squares()
函数被调用,其参数是一个符号张量(在这种情况下,一个形状为[]的 int32 张量),以及在跟踪期间生成的最终图。节点表示操作,箭头表示张量(生成的函数和图都被简化了)。
提示
为了查看生成的函数源代码,您可以调用tf.autograph.to_code(sum_squares.python_function)
。代码并不一定要漂亮,但有时可以帮助调试。
TF 函数规则
大多数情况下,将执行 TensorFlow 操作的 Python 函数转换为 TF 函数是微不足道的:用@tf.function
装饰它,或者让 Keras 为您处理。但是,有一些规则需要遵守:
- 如果调用任何外部库,包括 NumPy 甚至标准库,这个调用只会在跟踪期间运行;它不会成为图的一部分。实际上,TensorFlow 图只能包括 TensorFlow 构造(张量、操作、变量、数据集等)。因此,请确保使用
tf.reduce_sum()
而不是np.sum()
,tf.sort()
而不是内置的sorted()
函数,等等(除非您真的希望代码只在跟踪期间运行)。这还有一些额外的影响:
- 如果您定义了一个 TF 函数
f(*x*)
,它只返回np.random.rand()
,那么只有在跟踪函数时才会生成一个随机数,因此f(tf.constant(2.))
和f(tf.constant(3.))
将返回相同的随机数,但f(tf.constant([2., 3.]))
将返回一个不同的随机数。如果将np.random.rand()
替换为tf.random.uniform([])
,那么每次调用都会生成一个新的随机数,因为该操作将成为图的一部分。 - 如果您的非 TensorFlow 代码具有副作用(例如记录某些内容或更新 Python 计数器),那么您不应该期望每次调用 TF 函数时都会发生这些副作用,因为它们只会在函数被跟踪时发生。
- 您可以在
tf.py_function()
操作中包装任意的 Python 代码,但这样做会影响性能,因为 TensorFlow 将无法对此代码进行任何图优化。这也会降低可移植性,因为图仅在安装了正确库的平台上运行 Python 可用(和 Python 可用的平台)。
- 您可以调用其他 Python 函数或 TF 函数,但它们应该遵循相同的规则,因为 TensorFlow 将捕获它们的操作在计算图中。请注意,这些其他函数不需要用
@tf.function
装饰。 - 如果函数创建了 TensorFlow 变量(或任何其他有状态的 TensorFlow 对象,例如数据集或队列),它必须在第一次调用时才能这样做,否则您将收到异常。通常最好在 TF 函数之外创建变量(例如,在自定义层的
build()
方法中)。如果要为变量分配新值,请确保调用其assign()
方法,而不是使用=
运算符。 - 您的 Python 函数的源代码应该对 TensorFlow 可用。如果源代码不可用(例如,如果您在 Python shell 中定义函数,无法访问源代码,或者如果您仅将编译后的 *.pyc Python 文件部署到生产环境),则图生成过程将失败或功能有限。
- TensorFlow 仅会捕获对张量或
tf.data.Dataset
进行迭代的for
循环(请参见第十三章)。因此,请确保使用for i in tf.range(*x*)
而不是for i in range(*x*)
,否则循环将不会在图中被捕获。相反,它将在跟踪期间运行。(如果for
循环旨在构建图,例如在神经网络中创建每个层,那么这可能是您想要的。) - 一如既往,出于性能原因,您应该尽可能使用矢量化实现,而不是使用循环。
是时候总结了!在本章中,我们从 TensorFlow 的简要概述开始,然后看了 TensorFlow 的低级 API,包括张量、操作、变量和特殊数据结构。然后我们使用这些工具来自定义 Keras API 中的几乎每个组件。最后,我们看了 TF 函数如何提升性能,如何使用 AutoGraph 和跟踪生成图形,以及编写 TF 函数时应遵循的规则(如果您想进一步打开黑匣子并探索生成的图形,您将在附录 D 中找到技术细节)。
在下一章中,我们将学习如何使用 TensorFlow 高效加载和预处理数据。
Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(五)(3)https://developer.aliyun.com/article/1482434