本文作者Erik Hallström是一名深度学习研究工程师,他的这份教程以Echo-RNN为例,介绍了如何在TensorFlow环境中构建一个简单的循环神经网络。
什么是RNN?
RNN是循环神经网络(Recurrent Neural Network)的英文缩写,它能结合数据点之间的特定顺序和幅值大小等多个特征,来处理序列数据。更重要的是,这种网络的输入序列可以是任意长度的。
举一个简单的例子:数字时间序列,具体任务是根据先前值来预测后续值。在每个时间步中,循环神经网络的输入是当前值,以及一个表征该网络在之前的时间步中已经获得信息的状态向量。该状态向量是RNN网络的编码记忆单元,在训练网络之前初始化为零向量。
图1:RNN处理序列数据的步骤示意图。
本文只对RNN做简要介绍,主要专注于实践:如何构建RNN网络。如果有网络结构相关的疑惑,建议多看看说明性文章。
关于RNN的介绍,强烈推荐《A Critical Review of Recurrent Neural Networks for Sequence Learning》,这篇出自加州大学圣地亚哥分校研究人员的文章介绍了几乎所有最新最全面的循环神经网络。(地址:https://arxiv.org/pdf/1506.00019.pdf)
在了解RNN网络的基本知识后,就很容易理解以下内容。
构建网络
我们先建立一个简单的回声状态网络(Echo-RNN)。这种网络能记忆输入数据信息,在若干时间步后将其回传。我们先设定若干个网络常数,读完文章你就能明白它们的作用。
from __future__ import print_function, divisionimport numpy as npimport tensorflow as tfimport matplotlib.pyplot as plt
num_epochs = 100total_series_length = 50000truncated_backprop_length = 15state_size = 4num_classes = 2echo_step = 3batch_size = 5num_batches = total_series_length//batch_size//truncated_backprop_length
生成数据
现在生成随机的训练数据,输入为一个随机的二元向量,在echo_step
个时间步后,可得到输入的“回声”,即输出。
def generateData():
x = np.array(np.random.choice(2, total_series_length, p=[0.5, 0.5]))
y = np.roll(x, echo_step)
y[0:echo_step] = 0
x = x.reshape((batch_size, -1)) # The first index changing slowest, subseries as rows
y = y.reshape((batch_size, -1)) return (x, y)
包含batch_size
的两行代码,将数据重构为新矩阵。神经网络的训练,需要利用小批次数据(mini-batch),来近似得到关于神经元权重的损失函数梯度。在训练过程中,随机批次操作能防止过拟合和降低硬件压力。整个数据集通过数据重构转化为一个矩阵,并将其分解为多个小批次数据。
图2:重构数据矩阵的示意图,箭头曲线指示了在不同行上的相邻时间步。浅灰色矩形代表“0”,深灰色矩形代表“1”。
构建计算图
首先在TensorFlow中建立一个计算图,指定将要执行的运算。该计算图的输入和输出通常是多维数组,也被称为张量(tensor)。我们可以利用CPU、GPU和远程服务器的计算资源,在会话中迭代执行该计算图。
变量和占位符
本文所用的基本TensorFlow数据结构是变量和占位符。占位符是计算图的“起始节点”。在运行每个计算图时,批处理数据被传递到占位符中。另外,RNN状态向量也是存储在占位符中,在每一次运行后更新输出。
batchX_placeholder = tf.placeholder(tf.float32, [batch_size, truncated_backprop_length])
batchY_placeholder = tf.placeholder(tf.int32, [batch_size, truncated_backprop_length])
init_state = tf.placeholder(tf.float32, [batch_size, state_size])
网络的权重和偏差作为TensorFlow的变量,在运行时保持不变,并在输入批数据后进行逐步更新。
W = tf.Variable(np.random.rand(state_size+1, state_size), dtype=tf.float32)
b = tf.Variable(np.zeros((1,state_size)), dtype=tf.float32)
W2 = tf.Variable(np.random.rand(state_size, num_classes),dtype=tf.float32)
b2 = tf.Variable(np.zeros((1,num_classes)), dtype=tf.float32)
下图表示了输入数据矩阵,以及虚线窗口指出了占位符的当前位置。在每次运行时,这个“批处理窗口”根据箭头指示方向,以定义好的长度从左边滑到右边。在示意图中,batch_size
(批数据数量)为3,truncated_backprop_length
(截断反传长度)为3,total_series_length
(全局长度)为36。这些参数是用来示意的,与实际代码中定义的值不一样。在示意图中序列各点也以数字标出。
图3:训练数据的示意图,用虚线矩形指示当前批数据,用数字标明了序列顺序。
拆分序列
现在开始构建RNN计算图的下个部分,首先我们要以相邻的时间步分割批数据。
# Unpack columnsinputs_series = tf.unpack(batchX_placeholder, axis=1)
labels_series = tf.unpack(batchY_placeholder, axis=1)
view raw
如下图所示,可以按批次分解各列,转成list格式文件。RNN会同时从不同位置开始训练时间序列:在示例中分别从4到6、从16到18和从28到30。用plural
和series
做变量名,是为了强调该变量为list文件,用来在每一步中表示具有多个位置的时间序列。
图4:将数据拆分为多列的原理图,用数字标出序列顺序,箭头表示相邻的时间步。
在我们的时间序列数据中,在三个位置同时开启训练,所以在前向传播时需要保存三个状态。我们在参数定义时就已经考虑到这一点了,故将init_state设置为3。
前向传播
接下来,我们继续构建计算图中执行RNN计算功能的模块。
# Forward passcurrent_state = init_state
states_series = []for current_input in inputs_series:
current_input = tf.reshape(current_input, [batch_size, 1])
input_and_state_concatenated = tf.concat(1, [current_input, current_state]) # Increasing number of columns
next_state = tf.tanh(tf.matmul(input_and_state_concatenated, W) + b) # Broadcasted addition
states_series.append(next_state)
current_state = next_state
在这段代码中,我们通过计算current_input
Wa
+ current_state
Wbin
,得到两个仿射变换的总和input_and_state_concatenated
。在连接这两个张量后,只用了一个矩阵乘法即可在每个批次中添加所有样本的偏置b
。
图5:第8行代码的矩阵计算示意图,省略了非线性变换arctan。
你可能会想知道变量truncated_backprop_lengthis
的作用。在训练时,RNN被看做是一种在每一层都有冗余权重的深层神经网络。在训练开始时,这些层由于展开后占据了太多的计算资源,因此要在有限的时间步内截断。在每个批次训练时,网络误差反向传播了三次。
计算Loss
这是计算图的最后一部分,我们建立了一个从状态到输出的全连接层,用于softmax分类,标签采用One-hot编码,用于计算每个批次的Loss。
logits_series = [tf.matmul(state, W2) + b2 for state in states_series] #Broadcasted additionpredictions_series = [tf.nn.softmax(logits) for logits in logits_series]
losses = [tf.nn.sparse_softmax_cross_entropy_with_logits(logits, labels) for logits, labels in zip(logits_series,labels_series)]
total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(0.3).minimize(total_loss)
最后一行是添加训练函数,TensorFlow将自动执行反向传播函数:对每批数据执行一次计算图,并逐步更新网络权重。
这里调用的tosparse_softmax_cross_entropy_with_logits
函数,能在内部算得softmax函数值后,继续计算交叉熵。在示例中,各类是互斥的,非0即1,这也是将要采用稀疏自编码的原因。标签的格式为[batch_size,num_classes]
。
可视化结果
我们利用可视化功能tensorboard,在训练过程中观察网络训练情况。它将会在时间维度上绘制Loss值,显示在训练批次中数据输入、数据输出和网络结构对不同样本的实时预测效果。
def plot(loss_list, predictions_series, batchX, batchY):
plt.subplot(2, 3, 1)
plt.cla()
plt.plot(loss_list) for batch_series_idx in range(5):
one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]
single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])
plt.subplot(2, 3, batch_series_idx + 2)
plt.cla()
plt.axis([0, truncated_backprop_length, 0, 2])
left_offset = range(truncated_backprop_length)
plt.bar(left_offset, batchX[batch_series_idx, :], width=1, color="blue")
plt.bar(left_offset, batchY[batch_series_idx, :] * 0.5, width=1, color="red")
plt.bar(left_offset, single_output_series * 0.3, width=1, color="green")
plt.draw()
plt.pause(0.0001)
建立训练会话
已经完成构建网络的工作,开始训练网络。在TensorFlow中,该计算图会在一个会话中执行。在每一步开始时,都会随机生成新的数据。
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
plt.ion()
plt.figure()
plt.show()
loss_list = [] for epoch_idx in range(num_epochs):
x,y = generateData()
_current_state = np.zeros((batch_size, state_size))
print("New data, epoch", epoch_idx) for batch_idx in range(num_batches):
start_idx = batch_idx * truncated_backprop_length
end_idx = start_idx + truncated_backprop_length
batchX = x[:,start_idx:end_idx]
batchY = y[:,start_idx:end_idx]
_total_loss, _train_step, _current_state, _predictions_series = sess.run(
[total_loss, train_step, current_state, predictions_series],
feed_dict={
batchX_placeholder:batchX,
batchY_placeholder:batchY,
init_state:_current_state
})
loss_list.append(_total_loss) if batch_idx%100 == 0:
print("Step",batch_idx, "Loss", _total_loss)
plot(loss_list, _predictions_series, batchX, batchY)
plt.ioff()
plt.show()
从第15-19行可以看出,在每次迭代中往前移动truncated_backprop_length
步,但可能有不同的stride值。这样做的缺点是,为了封装相关的训练数据,truncated_backprop_length
的值要显著大于时间依赖值(本文中为3步),否则可能会丢失很多有效信息,如图6所示。
图6:数据示意图
我们用多个正方形来代表时间序列,上升的黑色方块表示回波输出,由输入回波(黑色方块)经过三次激活后得到。滑动批处理窗口在每次运行时也滑动了三次,在示例中之前没有任何批数据,用来封装依赖关系,因此它不能进行训练。
请注意,本文只是用一个简单示例解释了RNN如何工作,可以轻松地用几行代码中来实现此网络。此网络将能够准确地了解回声行为,因此不需要任何测试数据。
在训练过程中,该程序实时更新图表,如图7所示。蓝色条表示用于训练的输入信号,红色条表示训练得到的输出回波,绿色条是RNN网络产生的预测回波。不同的条形图显示了在当前批次中多个批数据的预测回波。
我们的算法能很快地完成训练任务。左上角的图表输出了损失函数,但为什么曲线上有尖峰?答案就在下面。
图7:各图分别为Loss,训练的输入和输出数据(蓝色和红色)以及预测回波(绿色)。
尖峰的产生原因是在新的迭代开始时,会产生新的数据。由于矩阵重构,每行上的第一个元素与上一行中的最后一个元素会相邻。但是所有行中的前几个元素(第一个除外)都具有不包含在该状态中的依赖关系,因此在最开始的批处理中,网络的预测功能不良。
整个程序
这是完整实现RNN网络的程序,只需复制粘贴即可运行。如果对文章有什么疑问,欢迎加量子位小助手qbitbot,注明“加入门群”并做个自我介绍,小助手将带你和更多小伙伴交流讨论。
from __future__ import print_function, divisionimport numpy as npimport tensorflow as tfimport matplotlib.pyplot as plt
num_epochs = 100total_series_length = 50000truncated_backprop_length = 15state_size = 4num_classes = 2echo_step = 3batch_size = 5num_batches = total_series_length//batch_size//truncated_backprop_lengthdef generateData():
x = np.array(np.random.choice(2, total_series_length, p=[0.5, 0.5]))
y = np.roll(x, echo_step)
y[0:echo_step] = 0
x = x.reshape((batch_size, -1)) # The first index changing slowest, subseries as rows
y = y.reshape((batch_size, -1)) return (x, y)
batchX_placeholder = tf.placeholder(tf.float32, [batch_size, truncated_backprop_length])
batchY_placeholder = tf.placeholder(tf.int32, [batch_size, truncated_backprop_length])
init_state = tf.placeholder(tf.float32, [batch_size, state_size])
W = tf.Variable(np.random.rand(state_size+1, state_size), dtype=tf.float32)
b = tf.Variable(np.zeros((1,state_size)), dtype=tf.float32)
W2 = tf.Variable(np.random.rand(state_size, num_classes),dtype=tf.float32)
b2 = tf.Variable(np.zeros((1,num_classes)), dtype=tf.float32)# Unpack columnsinputs_series = tf.unpack(batchX_placeholder, axis=1)
labels_series = tf.unpack(batchY_placeholder, axis=1)# Forward passcurrent_state = init_state
states_series = []for current_input in inputs_series:
current_input = tf.reshape(current_input, [batch_size, 1])
input_and_state_concatenated = tf.concat(1, [current_input, current_state]) # Increasing number of columns
next_state = tf.tanh(tf.matmul(input_and_state_concatenated, W) + b) # Broadcasted addition
states_series.append(next_state)
current_state = next_state
logits_series = [tf.matmul(state, W2) + b2 for state in states_series] #Broadcasted additionpredictions_series = [tf.nn.softmax(logits) for logits in logits_series]
losses = [tf.nn.sparse_softmax_cross_entropy_with_logits(logits, labels) for logits, labels in zip(logits_series,labels_series)]
total_loss = tf.reduce_mean(losses)
train_step = tf.train.AdagradOptimizer(0.3).minimize(total_loss)def plot(loss_list, predictions_series, batchX, batchY):
plt.subplot(2, 3, 1)
plt.cla()
plt.plot(loss_list) for batch_series_idx in range(5):
one_hot_output_series = np.array(predictions_series)[:, batch_series_idx, :]
single_output_series = np.array([(1 if out[0] < 0.5 else 0) for out in one_hot_output_series])
plt.subplot(2, 3, batch_series_idx + 2)
plt.cla()
plt.axis([0, truncated_backprop_length, 0, 2])
left_offset = range(truncated_backprop_length)
plt.bar(left_offset, batchX[batch_series_idx, :], width=1, color="blue")
plt.bar(left_offset, batchY[batch_series_idx, :] * 0.5, width=1, color="red")
plt.bar(left_offset, single_output_series * 0.3, width=1, color="green")
plt.draw()
plt.pause(0.0001)with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
plt.ion()
plt.figure()
plt.show()
loss_list = [] for epoch_idx in range(num_epochs):
x,y = generateData()
_current_state = np.zeros((batch_size, state_size))
print("New data, epoch", epoch_idx) for batch_idx in range(num_batches):
start_idx = batch_idx * truncated_backprop_length
end_idx = start_idx + truncated_backprop_length
batchX = x[:,start_idx:end_idx]
batchY = y[:,start_idx:end_idx]
_total_loss, _train_step, _current_state, _predictions_series = sess.run(
[total_loss, train_step, current_state, predictions_series],
feed_dict={
batchX_placeholder:batchX,
batchY_placeholder:batchY,
init_state:_current_state
})
loss_list.append(_total_loss) if batch_idx%100 == 0:
print("Step",batch_idx, "Loss", _total_loss)
plot(loss_list, _predictions_series, batchX, batchY)
plt.ioff()
plt.show()