本文介绍下 RNN 及几种变种的结构和对应的 TensorFlow 源码实现,另外通过简单的实例来实现 TensorFlow RNN 相关类的调用。
RNN
RNN,循环神经网络,Recurrent Neural Networks。人们思考问题往往不是从零开始的,比如阅读时我们对每个词的理解都会依赖于前面看到的一些信息,而不是把前面看的内容全部抛弃再去理解某处的信息。应用到深度学习上面,如果我们想要学习去理解一些依赖上文的信息,RNN 便可以做到,它有一个循环的操作,可以使其可以保留之前学习到的内容。
RNN 的结构如下:
在上图网络结构中,对于矩形块 A 的那部分,通过输入xt(t时刻的特征向量),它会输出一个结果ht(t时刻的状态或者输出)。网络中的循环结构使得某个时刻的状态能够传到下一个时刻。
这些循环的结构让 RNNs 看起来有些难以理解,但我们可以把 RNNs 看成是一个普通的网络做了多次复制后叠加在一起组成的,每一网络会把它的输出传递到下一个网络中。我们可以把 RNNs 在时间步上进行展开,就得到下图这样:
所以最基本的 RNN Cell 输入就是 xt,它还会输出一个隐含内容传递到下一个 Cell,同时还会生成一个结果 ht,其最基本的结构如如下:
仅仅是输入的 xt 和隐藏状态进行 concat,然后经过线性变换后经过一个 tanh 激活函数便输出了,另外隐含内容和输出结果是相同的内容。
我们来分析一下 TensorFlow 里面 RNN Cell 的实现。
TensorFlow 实现 RNN Cell 的位置在 python/ops/rnn_cell_impl.py,首先其实现了一个 RNNCell 类,继承了 Layer 类,其内部有三个比较重要的方法,state_size()、output_size()、__call__() 方法,其中 state_size() 和 output_size() 方法设置为类属性,可以当做属性来调用,实现如下:
@property
def state_size(self):
"""size(s) of state(s) used by this cell.
It can be represented by an Integer, a TensorShape or a tuple of Integers
or TensorShapes.
"""
raise NotImplementedError("Abstract method")
@property
def output_size(self):
"""Integer or TensorShape: size of outputs produced by this cell."""
raise NotImplementedError("Abstract method")
分别代表 Cell 的状态和输出维度,和 Cell 中的神经元数量有关,但这里两个方法都没有实现,意思是说我们必须要实现一个子类继承 RNNCell 类并实现这两个方法。
另外对于 __call__() 方法,实际上就是当初始化的对象直接被调用的时候触发的方法,实现如下:
def __call__(self, inputs, state, scope=None):
if scope is not None:
with vs.variable_scope(scope,
custom_getter=self._rnn_get_variable) as scope:
return super(RNNCell, self).__call__(inputs, state, scope=scope)
else:
with vs.variable_scope(vs.get_variable_scope(),
custom_getter=self._rnn_get_variable):
return super(RNNCell, self).__call__(inputs, state)
实际上是调用了父类 Layer 的 __call__() 方法,但父类中 __call__() 方法中又调用了 call() 方法,而 Layer 类的 call() 方法的实现如下:
def call(self, inputs, **kwargs):
return inputs
父类的 call() 方法实现非常简单,所以要实现其真正的功能,只需要在继承 RNNCell 类的子类中实现 call() 方法即可。
接下来我们看下 RNN Cell 的最基本的实现,叫做 BasicRNNCell,其代码如下:
class BasicRNNCell(RNNCell):
"""The most basic RNN cell.
Args:
num_units: int, The number of units in the RNN cell.
activation: Nonlinearity to use. Default: `tanh`.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
"""
def __init__(self, num_units, activation=None, reuse=None):
super(BasicRNNCell, self).__init__(_reuse=reuse)
self._num_units = num_units
self._activation = activation or math_ops.tanh
self._linear = None
@property
def state_size(self):
return self._num_units
@property
def output_size(self):
return self._num_units
def call(self, inputs, state):
"""Most basic RNN: output = new_state = act(W * input + U * state + B)."""
if self._linear is None:
self._linear = _Linear([inputs, state], self._num_units, True)
output = self._activation(self._linear([inputs, state]))
return output, output
可以看到在初始化的时候,最终要的一个参数是 num_units,意思就是这个 Cell 中神经元的个数,另外还有一个参数 activation 即默认使用的激活函数,默认使用的 tanh,reuse 代表该 Cell 是否可以被重新使用。
在 state_size()、output_size() 方法里,其返回的内容都是 num_units,即神经元的个数,接下来 call() 方法中,传入的参数为 inputs 和 state,即输入的 x 和 上一次的隐含状态,首先实例化了一个 _Linear 类,这个类实际上就是做线性变换的类,将二者传递过来,然后直接调用,就实现了 w * [inputs, state] + b 的线性变换,其中 _Linear 类的 __call__() 方法实现如下:
def __call__(self, args):
if not self._is_sequence:
args = [args]
if len(args) == 1:
res = math_ops.matmul(args[0], self._weights)
else:
res = math_ops.matmul(array_ops.concat(args, 1), self._weights)
if self._build_bias:
res = nn_ops.bias_add(res, self._biases)
return res
很明显这里传递了 [inputs, state] 作为 __call__() 方法的 args,会执行 concat() 和 matmul() 方法,然后接着再执行 bias_add() 方法,这样就实现了线性变换。
最后回到 BasicRNNCell 的 call() 方法中,在 _linear() 方法外面又包括了一层 _activation() 方法,即对线性变换应用一次 tanh 激活函数处理,作为输出结果。
最后返回的结果是 output 和 output,第一个代表 output,第二个代表隐状态,其值也等于 output。
我们用一个实例来感受一下:
import tensorflow as tf
cell = tf.nn.rnn_cell.BasicRNNCell(num_units=128)
print(cell.state_size)
inputs = tf.placeholder(tf.float32, shape=[32, 100])
h0 = cell.zero_state(32, tf.float32)
output, h1 = cell(inputs=inputs, state=h0)
print(output, output.shape)
print(h1, h1.shape)
这里我们首先初始化了一个神经元个数为 128 的 BasicRNNCell 类,然后构造了一个 shape 为 [32, 100] 的变量作为 inputs,其代表 batch_size 为 32, 维度为 100,随后初始化了初始隐藏状态,调用了 zero_state() 方法,然后直接调用 cell,实际上是最终调用了其 call() 方法,最后得到 output 和 h1,打印输出结果:
128
Tensor("basic_rnn_cell/Tanh:0", shape=(32, 128), dtype=float32) (32, 128)
Tensor("basic_rnn_cell/Tanh:0", shape=(32, 128), dtype=float32) (32, 128)
可以看到,当输入变量维度为 100 的时候,经过一个 128 神经元 Cell 之后,输出维度变成了 128,其输出 shape 变成了 [32, 128],且此时输出结果和隐藏状态是相同的。
LSTM
RNNs 的出现,主要是因为它们能够把以前的信息联系到现在,从而解决现在的问题。比如,利用前面的信息,能够帮助我们理解当前的内容。
有时候,我们在处理当前任务的时候,只需要看一下比较近的一些信息。比如在一个语言模型中,我们要通过上文来预测一下个词可能是什么,那么当我们看到 “the clouds are in the?”时,不需要更多的信息,我们就能够自然而然的想到下一个词应该是“sky”。在这样的情况下,我们所要预测的内容和相关信息之间的间隔很小,这种情况下 RNNs 就能够利用过去的信息, 很容易实现:
但是如果我们想依赖前文距离非常远的信息时,普通的 RNN 就非常难以做到了,随着间隔信息的增大,RNN 难以对其做关联:
但是 LSTM 可以用来解决这个问题。
LSTM,Long Short Term Memory Networks,是 RNN 的一个变种,经试验它可以用来解决更多问题,并取得了非常好的效果。
LSTM Cell 的结构如下:
LSTMs 最关键的地方在于 Cell 的状态 和 结构图上面的那条横穿的水平线。
Cell 状态的传输就像一条传送带,向量从整个 Cell 中穿过,只是做了少量的线性操作。这种结构能够很轻松地实现信息从整个 Cell 中穿过而不做改变。
若只有上面的那条水平线是没办法实现添加或者删除信息的,信息的操作是是通过一种叫做门的结构来实现的。
这里我们可以把门分为三个:遗忘门(Forget Gate)、传入门(Input Gate)、输出门(Output Gate)。
遗忘门(Forget Gate)
首先是 LSTM 要决定让那些信息继续通过这个 Cell,这是通过 Forget Gate 的 sigmoid 神经层来实现的。它的输入是ht−1和xt,输出是一个数值都在 0,1 之间的向量,表示让 Ct−1 的各部分信息通过的比重。 0 表示“不让任何信息通过”, 1 表示“让所有信息通过”。
传入门(Input Gate)
下一步是决定让多少新的信息加入到 Cell 中来,一个叫做 Input Gate 的 sigmoid 层决定哪些信息需要更新,一个 New Input 通过 tanh 生成一个向量,也就是备选的用来更新的内容,Ct~ 。在下一步,我们把这两部分联合起来,对 Cell 的状态进行一个更新。
在经过 Forget Gate 和 Input Gate 处理后,我们就可以对输入的 Ct-1 做更新了,即把Ct−1 更新为 Ct,首先我们把旧的状态 Ct−1 和 ft 相乘, 把一些不想保留的信息忘掉。然后加上 it∗Ct~,这部分信息就是我们要添加的新内容,这样就可以完成对 Ct-1 的更新。
输出门 (Output Gate)
最后我们需要来决定输出什么值,输出主要是依赖于 Cell 的状态 Ct,但是又不仅仅依赖于 Ct,而是需要经过一个过滤的处理。首先,我们还是使用一个 sigmoid 层来决定 Ct 中的哪部分信息会被输出。然后我们把 Ct 通过一个 tanh 激活函数处理,然后把其输出和 sigmoid 计算出来的权重相乘,这样就得到了最后输出的结果。
到了最后,其输出结果有三个内容,其中输出结果就是最上面的箭头代指的内容,即最终计算的结果,隐层包括两部分内容,一个是 Ct,一个是最下方的 ht,我们可以将其合并为一个变量来表示。
接下来我们来看下 LSTMCell 的 TensorFlow 代码实现。
首先它的类是 BasicLSTMCell 类,继承了 RNNCell 类,其初始化方法 init() 实现如下:
def __init__(self, num_units, forget_bias=1.0,
state_is_tuple=True, activation=None, reuse=None):
super(BasicLSTMCell, self).__init__(_reuse=reuse)
if not state_is_tuple:
logging.warn("%s: Using a concatenated state is slower and will soon be "
"deprecated. Use state_is_tuple=True.", self)
self._num_units = num_units
self._forget_bias = forget_bias
self._state_is_tuple = state_is_tuple
self._activation = activation or math_ops.tanh
self._linear = None
这里必须传入的参数仍然是 num_units,即神经元的个数,然后 forget_bias 是初始化 Forget Gate 的偏置大小,state_is_tuple 指的是输出状态类型是元组类型,activation 代表默认激活函数,reuse 代表是否可以被重复使用。
接下来看下 state_size() 方法和 output_size() 方法,实现如下:
@property
def state_size(self):
return (LSTMStateTuple(self._num_units, self._num_units)
if self._state_is_tuple else 2 * self._num_units)
@property
def output_size(self):
return self._num_units
这里 state_size() 方法变了,因为输出的 state 需要将 Ct 和隐含状态合并,所以它需要包含两部分的内容,如果传入的参数 state_is_tuple 为 True 的话,状态会被表示成一个元组,否则会是 num_units 乘以 2 的数字,默认是元组形式。output_size() 方法则保持不变。
对于 call() 方法,其实现如下:
def call(self, inputs, state):
"""Long short-term memory cell (LSTM).
Args:
inputs: `2-D` tensor with shape `[batch_size x input_size]`.
state: An `LSTMStateTuple` of state tensors, each shaped
`[batch_size x self.state_size]`, if `state_is_tuple` has been set to
`True`. Otherwise, a `Tensor` shaped
`[batch_size x 2 * self.state_size]`.
Returns:
A pair containing the new hidden state, and the new state (either a
`LSTMStateTuple` or a concatenated state, depending on
`state_is_tuple`).
"""
sigmoid = math_ops.sigmoid
# Parameters of gates are concatenated into one multiply for efficiency.
if self._state_is_tuple:
c, h = state
else:
c, h = array_ops.split(value=state, num_or_size_splits=2, axis=1)
if self._linear is None:
self._linear = _Linear([inputs, h], 4 * self._num_units, True)
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = array_ops.split(
value=self._linear([inputs, h]), num_or_size_splits=4, axis=1)
new_c = (
c * sigmoid(f + self._forget_bias) + sigmoid(i) * self._activation(j))
new_h = self._activation(new_c) * sigmoid(o)
if self._state_is_tuple:
new_state = LSTMStateTuple(new_c, new_h)
else:
new_state = array_ops.concat([new_c, new_h], 1)
return new_h, new_state
首先为了获取 c, h,需要将其从 state 中分离开来,如果传入的 state 是元组的话可以直接分解,否则需要调用 split() 方法来分解:
if self._state_is_tuple:
c, h = state
else:
c, h = array_ops.split(value=state, num_or_size_splits=2, axis=1)
接下来定义了几个门的实现:
i, j, f, o = array_ops.split(value=self._linear([inputs, h]), num_or_size_splits=4, axis=1)
放到一起来用 Linear 计算然后分成了 4 份,分别代表 Input Gate、New Input、Forget Gate、Output Gate,用 i、j、f、o 来表示,这时候四个变量都经过了线性变换,乘以权重并做了偏置操作。
接下来就是更新 Ct-1 为 Ct 和得到隐含状态输出了,都是遵循 LSTM 内部的公式实现:
new_c = (c * sigmoid(f + self._forget_bias) + sigmoid(i) * self._activation(j))
new_h = self._activation(new_c) * sigmoid(o)
这里值得注意的是还多加了一个 _forget_bias 变量,即设置了初始化偏置,以免初始输出为 0 的问题。
最后将 new_c 和 new_h 进行合并,如果要输出元组,那么就合并为元组,否则二者进行 concat 操作,返回的结果是 new_h、new_state,前者即 Cell 的输出结果,后者代表隐含状态:
if self._state_is_tuple:
new_state = LSTMStateTuple(new_c, new_h)
else:
new_state = array_ops.concat([new_c, new_h], 1)
return new_h, new_state
我们再用一个实例来感受一下 BasicLSTMCell 的用法:
import tensorflow as tf
cell = tf.nn.rnn_cell.BasicLSTMCell(num_units=128)
print(cell.state_size)
inputs = tf.placeholder(tf.float32, shape=(32, 100))
h0 = cell.zero_state(32, tf.float32)
output, h1 = cell(inputs=inputs, state=h0)
print(h1)
print(h1.h, h1.h.shape)
print(h1.c, h1.c.shape)
print(output, output.shape)
这里我们首先初始化了一个神经元个数为 128 的 BasicRNNCell 类,然后构造了一个 shape 为 [32, 100] 的变量作为 inputs,其代表 batch_size 为 32, 维度为 100,随后初始化了初始隐藏状态,调用了 zero_state() 方法,然后直接调用 cell,实际上是最终调用了其 call() 方法,最后得到 output 和 h1,此时 h1 是一个元组,它还可以分离成 h 和 c,分别打印其对象和维度,结果如下:
LSTMStateTuple(c=128, h=128)
LSTMStateTuple(c=<tf.Tensor 'add_1:0' shape=(32, 128) dtype=float32>, h=<tf.Tensor 'mul_2:0' shape=(32, 128) dtype=float32>)
Tensor("mul_2:0", shape=(32, 128), dtype=float32) (32, 128)
Tensor("add_1:0", shape=(32, 128), dtype=float32) (32, 128)
Tensor("mul_2:0", shape=(32, 128), dtype=float32) (32, 128)
可以看到其维度都是 [32, 128],而且 h1.h 和 output 是相同的。
另外 LSTM 有许多变种,其中一个比较有名的就是 Gers & Schmidhuber (2000) 提出的,它在原来的基础上行添加了 Peephole Connections,使得遗忘门可以受 Ct-1 的影响。
另外还有一个变种就是将 Forget Gate 和 Input Gate 二者联合起来,做到要么遗忘老的输入新的,要么保留老的不输入新的。
但接下来还有一个更常用的变种,俺就是 GRU,它是由 Cho, et al. (2014) 提出的,在提出的同时他还提出了 Seq2Seq 模型,为 Generation Model 做好了铺垫。
GRU
GRU,Gated Recurrent Unit,在 GRU 中,只有两个门:重置门(Reset Gate)和更新门(Update Gate)。同时在这个结构中,把 Ct 和隐藏状态进行了合并,整体结构比标准的 LSTM 结构要简单,而且这个结构后来也非常流行。
接下来我们看下 TensorFlow 中 GRUCell 的实现,代码如下:
class GRUCell(RNNCell):
"""Gated Recurrent Unit cell (cf. http://arxiv.org/abs/1406.1078).
Args:
num_units: int, The number of units in the GRU cell.
activation: Nonlinearity to use. Default: `tanh`.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
kernel_initializer: (optional) The initializer to use for the weight and
projection matrices.
bias_initializer: (optional) The initializer to use for the bias.
"""
def __init__(self,
num_units,
activation=None,
reuse=None,
kernel_initializer=None,
bias_initializer=None):
super(GRUCell, self).__init__(_reuse=reuse)
self._num_units = num_units
self._activation = activation or math_ops.tanh
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._gate_linear = None
self._candidate_linear = None
@property
def state_size(self):
return self._num_units
@property
def output_size(self):
return self._num_units
def call(self, inputs, state):
"""Gated recurrent unit (GRU) with nunits cells."""
if self._gate_linear is None:
bias_ones = self._bias_initializer
if self._bias_initializer is None:
bias_ones = init_ops.constant_initializer(1.0, dtype=inputs.dtype)
with vs.variable_scope("gates"): # Reset gate and update gate.
self._gate_linear = _Linear(
[inputs, state],
2 * self._num_units,
True,
bias_initializer=bias_ones,
kernel_initializer=self._kernel_initializer)
value = math_ops.sigmoid(self._gate_linear([inputs, state]))
r, u = array_ops.split(value=value, num_or_size_splits=2, axis=1)
r_state = r * state
if self._candidate_linear is None:
with vs.variable_scope("candidate"):
self._candidate_linear = _Linear(
[inputs, r_state],
self._num_units,
True,
bias_initializer=self._bias_initializer,
kernel_initializer=self._kernel_initializer)
c = self._activation(self._candidate_linear([inputs, r_state]))
new_h = u * state + (1 - u) * c
return new_h, new_h
在 state_size()、output_size() 方法里,其返回的内容都是 num_units,即神经元的个数。
接下来 call() 方法中,因为 Reset Gate rt 和 Update Gate zt 分别用变量 r、u 表示,它们需要先对 ht-1 即 state 和 xt 做合并,然后再实现线性变换,再调用 sigmod 函数得到:
value = math_ops.sigmoid(self._gate_linear([inputs, state]))
r, u = array_ops.split(value=value, num_or_size_splits=2, axis=1)
然后需要求解 ht~,首先用 rt 和 ht-1 即 state 相乘:
r_state = r * state
然后将其放到线性函数里面,在调用 tanh 激活函数即可:
c = self._activation(self._candidate_linear([inputs, r_state]))
最后计算隐含状态和输出结果,二者一致:
new_h = u * state + (1 - u) * c
return new_h, new_h
这样即可返回得到输出结果和隐藏状态。
我们用一个实例感受一下:
import tensorflow as tf
cell = tf.nn.rnn_cell.GRUCell(num_units=128)
print(cell.state_size)
inputs = tf.placeholder(tf.float32, shape=[32, 100])
h0 = cell.zero_state(32, tf.float32)
output, h1 = cell(inputs=inputs, state=h0)
print(output, output.shape)
print(h1, h1.shape)
运行结果:
128
Tensor("gru_cell/add:0", shape=(32, 128), dtype=float32) (32, 128)
Tensor("gru_cell/add:0", shape=(32, 128), dtype=float32) (32, 128)
这个结果和 BasicRNNCell 并无二致,但 GRUCell 内部的结构使模型的效果更加优化,一般我们也会选取 GRUCell 来代替原生的 BasicRNNCell。