4.3.3.5 代码编写
1、模型一些参数确定
def __init__(self, Tx=30, Ty=10, n_x=32, n_y=64): # 定义网络的相关参数 self.model_param = { "Tx": Tx, # 定义encoder序列最大长度 "Ty": Ty, # decoder序列最大长度 "n_x": n_x, # encoder的隐层输出值大小 "n_y": n_y # decoder的隐层输出值大小和cell输出值大小 }
对于加载数据来说,我们不需要去进行编写逻辑了,看一下大概的逻辑.
2、加载数据
加载的代码逻辑在nmt_utils当中
from nmt_utils import *
添加一些参数到model_param中
# 添加特征词个不重复个数以及目标词的不重复个数 self.model_param["x_vocab"] = x_vocab self.model_param["y_vocab"] = y_vocab self.model_param["x_vocab_size"] = len(x_vocab) self.model_param["y_vocab_size"] = len(y_vocab)
我们先看整个训练的逻辑,并从中来实现整个模型的定义,计算逻辑
dataset:[('9 may 1998', '1998-05-09'), ('10.09.70', '1970-09-10')] x_vocab:翻译前的格式对应数字{' ': 0, '.': 1, '/': 2, '0': 3, '1': 4, '2': 5, '3': 6, '4': 7,...........} y_vocab:翻译后的格式对应数字{'-': 0, '0': 1, '1': 2, '2': 3, '3': 4, '4': 5, '5': 6, '6': 7, '7': 8, '8': 9, '9': 10} # 添加特征词个不重复个数以及目标词的不重复个数 self.model_param["x_vocab"] = x_vocab self.model_param["y_vocab"] = y_vocab self.model_param["x_vocab_size"] = len(x_vocab) self.model_param["y_vocab_size"] = len(y_vocab)
3、训练步骤
(1)定义好网络的输入输出格式
(2)定义好优化器(选择Adam,参数lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001)
- from keras.optimizers import Adam
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
(3)模型训练,
- model.fit(inputs, outputs, epochs=1,batch_size=100)
def train(self, X_onehot, Y_onehot): """ 训练 :param X_onehot: 特征值的one_hot编码 :param Y_onehot: 目标值的one_hot编码 :return: """ # 利用网络结构定义好模型输入输出 model = self.model() opt = Adam(lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) s0 = np.zeros((10000, self.model_param["n_y"])) c0 = np.zeros((10000, self.model_param["n_y"])) outputs = list(Y_onehot.swapaxes(0, 1)) # 输入x,以及decoder中LSTM的两个初始化值 model.fit([X_onehot, s0, c0], outputs, epochs=1, batch_size=100) return None
(1)定义网络好的输入到输出流程
- 步骤1、定义模型的输入
- 步骤2:使用encoder的双向LSTM结构得输出a
- 步骤3:循环decoder的Ty次序列输入,获取decoder最后输出
1: 定义decoder第t'时刻的注意力结构并输出context
context = self.computer_one_attention(a, s)(需要实现Attention结构的计算过程)
2: 对"context" 和初始两个状态s0,c0输入到deocder当中,返回两个输出
s, _, c = self.decoder(context, initial_state=[s, c])
3: 应用 Dense layere获取deocder的t'时刻的输出 out = self.output_layer(s)
- 步骤 4: 创建model实例,定义输入输出
- from keras.models import Model
def model(self): """ 定义模型获取模型实例 :param model_param: 网络的相关参数 :param seq2seq:网络结构 :return: model,Keras model instance """ # 步骤1、定义模型的输入 # 定义decoder中隐层初始状态值s0以及cell输出c0 X = Input(shape=(self.model_param["Tx"], self.model_param["x_vocab_size"]), name='X') # 定义deocder初始化状态 s0 = Input(shape=(self.model_param["n_y"],), name='s0') c0 = Input(shape=(self.model_param["n_y"],), name='c0') s = s0 c = c0 # 定义装有输出值的列表 outputs = [] # 步骤2:使用encoder的双向LSTM结构得输出a a = self.encoder(X) # 步骤3:循环decoder的Ty次序列输入,获取decoder最后输出 # 包括计算Attention输出 for t in range(self.model_param["Ty"]): # 1: 定义decoder第t'时刻的注意力结构并输出context context = self.computer_one_attention(a, s) # 2: 对"context" vector输入到deocder当中 # 获取cell的两个输出隐层状态和,initial_state= [previous hidden state, previous cell state] s, _, c = self.decoder(context, initial_state=[s, c]) # 3: 应用 Dense layere获取deocder的t'时刻的输出 out = self.output_layer(s) # 4: 将decoder中t'时刻的输出装入列表 outputs.append(out) # 步骤 4: 创建model实例,定义输入输出 model = Model(inputs=(X, s0, c0), outputs=outputs) return model
3、模型初始化模型结构定义
在训练中有一些模型结构,所以现需要定义这些结构统一初始化,这些模型结构作为整个Seq2Seq类的属性,初始化逻辑。
def init_seq2seq(self): """ 初始化网络结构 :return: """ # 添加encoder属性 self.get_encoder() # 添加decoder属性 self.get_decoder() # 添加attention属性 self.get_attention() # 添加get_output_layer属性 self.get_output_layer() return None
定义编解码器、Attention机制、输出层
Keras是一个高级神经网络API,用Python编写,能够在TensorFlow之上运行。它的开发重点是实现快速实验。能够以最小的延迟从想法到结果是进行良好研究的关键。
如果您需要深度学习库,请使用Keras:允许简单快速的原型设计(通过用户友好性,模块化和可扩展性)
- 编码器
- 编码器:使用双向LSTM(隐层传递有双向值传递)
- from keras.layers import LSTM, Bidirectional
- LSTM(units, return_sequences=False,name="")
- units: 正整数, units状态输出的维度
- return_sequences:布尔类型 是否返回输出序列
- return:LSTM layer
- Bidirectional(layer, merge_mode='concat')
- 对RNN、LSTM进行双向装饰
- layer:RNN layer或者LSTM layer
- merge_mode:将RNN/LSTM的前向和后向输出值进行合并
- {'sum', 'mul', 'concat', 'ave', None}
def get_encoder(self): """ 定义编码器结构 :return: """ # 指定隐层值输出的大小 self.encoder = Bidirectional(LSTM(self.model_param["n_x"], return_sequences=True, name='bidirectional_1'), merge_mode='concat') return None
- 解码器
- return_state: 布尔,是否返回输出以及状态值
- if return_state: 第一个值是output. 后面的值是状态值shape 为 (batch_size, units).
def get_decoder(self): """ 定义解码器结构 :return: """ # 定义decoder结构,指定隐层值的形状大小,return_state=True self.decoder = LSTM(self.model_param["n_y"], return_state=True) return None
- 输出层
- from keras.layer import Dense
- 指定一个普通的全连接层,并且可以指定激活函数
- Dense(units, activation=None)
- 神经元个数(输出大小)
- activation=None:激活函数
def get_output_layer(self): """ 定义输出层 :return: output_layer """ # 对decoder输出进行softmax,输出向量大小为y_vocab大小 self.output_layer = Dense(self.model_param["y_vocab_size"], activation=softmax) return None
- computer_one_attention函数实现:attention层结构
- 1、定义结构
- 2、实现输入输出结果
- \alpha_{t' t} = \frac{\exp(e_{t' t})}{ \sum_{k=1}^T \exp(e_{t' k}) },\quad t=1,\ldots,Tαt′t=∑k=1Texp(et′k)exp(et′t),t=1,…,T
- e_{t' t} = g({s}_{t' - 1}, {h}_t)= {v}^\top \tanh({W}_s {s} + {W}_h {h})et′t=g(st′−1,ht)=v⊤tanh(Wss+Whh)
- from keras.layers import RepeatVector, Concatenate, Dot, Activation
def get_attention(self): """ 定义Attention的结构 :return: attention结构 """ # 定义RepeatVector复制成多个维度 repeator = RepeatVector(self.model_param["Tx"]) # 进行矩阵拼接 concatenator = Concatenate(axis=-1) # 进行全连接层10个神经元 densor1 = Dense(10, activation="tanh", name='Dense1') # 接着relu函数 densor2 = Dense(1, activation="relu", name='Dense2') # softmax activator = Activation(softmax, name='attention_weights') # context计算 dotor = Dot(axes=1) # 将结构存储在attention当中 self.attention = { "repeator": repeator, "concatenator": concatenator, "densor1": densor1, "densor2": densor2, "activator": activator, "dotor": dotor } return None
- Attention输入输出逻辑
- 使用Attention结构去实现输入到输出的逻辑
def computer_one_attention(self, a, s_prev): """ 利用定义好的attention结构计算中的alpha系数与a对应输出 :param a:隐层状态值 (m, Tx, 2*n_a) :param s_prev: LSTM的初始隐层状态值, 形状(sample, n_s) :return: context """ # 使用repeator扩大数据s_prev的维度为(sample, Tx, n_y),这样可以与a进行合并 s_prev = self.attention["repeator"](s_prev) # 将a和s_prev 按照最后一个维度进行合并计算 concat = self.attention["concatenator"]([a, s_prev]) # 使用densor1全连接层网络计算出e e = self.attention["densor1"](concat) # 使用densor2增加relu激活函数计算 energies = self.attention["densor2"](e) # 使用"activator"的softmax函数计算权重"alphas" # 这样一个attention的系数计算完成 alphas = self.attention["activator"](energies) # 使用dotor,矩阵乘法,将 "alphas" and "a" 去计算context/c context = self.attention["dotor"]([alphas, a]) return context
4、测试逻辑
model.load_weights(path):加载模型
def test(self): """ 测试 :return: """ model = self.model() model.load_weights("./models/model.h5") example = '1 March 2001' source = string_to_int(example, self.model_param["Tx"], self.model_param["x_vocab"]) source = np.expand_dims(np.array(list(map(lambda x: to_categorical(x, num_classes=self.model_param["x_vocab_size"]), source))), axis=0) s0 = np.zeros((10000, self.model_param["n_y"])) c0 = np.zeros((10000, self.model_param["n_y"])) prediction = model.predict([source, s0, c0]) prediction = np.argmax(prediction, axis=-1) output = [dict(zip(self.model_param["y_vocab"].values(), self.model_param["y_vocab"].keys()))[int(i)] for i in prediction] print("source:", example) print("output:", ''.join(output)) return None