1 语言模型步骤
简单概述:根据输入内容,继续输出后面的句子。
1.1 根据需求拆分任务
- (1)先对模型输入一段文字,令模型输出之后的一个文字。
- (2)将模型预测出来的文字当成输入,再放到模型里,使模型预测出下一个文字,这样循环下去,以使RNN完成一句话的输出。
1.2 根据任务设计功能模块
- (1)模型能够记住前面文字的语义;
- (2)能够根据前面的语义和一个输入文字,输出下一个文字。
1.3 根据功能模块设计实现方案
RNN模型的接口可以输出两个结果:预测值和当前状态
1.在实现时,将输入的序列样本拆开,使用循环的方式,将字符逐个输入模型。模型会对每次的输入预测出两个结果,一个是预测字符,另一个是当前的序列状态。
2.在训练场景下,将硕测字骑用于计算损失,列状动用于传入下一次循环计算。
3.在测试场景下,用循环的方式将输入序列中的文字一个个地传入到模型中,得到最后一个时刻的当前状态,将该状态和输入序列中的最后一个文字转入模型,生成下一个文字的预测结果。同时,按照要求生成的文字条件,重复地将新生成的文字和当前状态输入模型,来预测下一个文字。
2 语言模型的代码实现
2.1 准备样本数据
样本内容:
在尘世的纷扰中,只要心头悬挂着远方的灯光,我们就会坚持不懈地走,理想为我们灌注了精神的蕴藉。所以,生活再平凡、再普通、再琐碎,我们都要坚持一种信念,默守一种精神,为自己积淀站立的信心,前行的气力。
2.1.1定义基本工具函数---make_Language_model.py(第1部分)
首先引入头文件,然后定义相关函数:get_ch_lable()从文件中获取文本,get_ch._able_v0将文本数组转方向量,具体代码如下:
import numpy as np import torch import torch.nn.functional as F import time import random from collections import Counter # 1.1 定义基本的工具函数 RANDOM_SEED = 123 torch.manual_seed(RANDOM_SEED) DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def elapsed(sec): # 计算时间函数 if sec < 60: return str(sec) + "sec" elif sec<(60*60): return str(sec/60) + "min" else: return str(sec/(60*60)) + "hour" training_file = 'csv_list/wordstest.txt' # 定义样本文件 #中文字 def get_ch_label(txt_file): # 提取样本中的汉字 labels = "" with open(txt_file,'rb') as f : for label in f: labels = labels + label.decode("gb2312",errors = 'ignore') return labels #中文多文件 def readalltxt(txt_files): # 处理中文 labels = [] for txt_file in txt_files: target = get_ch_label(txt_file) labels.append(target) return labels # 将汉子转化成向量,支持文件和内存对象里的汉字转换 def get_ch_label_v(txt_file,word_num_map,txt_label = None): words_size = len(word_num_map) to_num = lambda word:word_num_map.get(word,words_size) if txt_file != None: txt_label = get_ch_label(txt_file) # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69) labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换 return labels_vector
2.1.2 样本预处理---make_Language_model.py(第2部分)
样本预处理这个一套指读取整个样本,将其放入training_data里,获取全部的字表words,并生成样本向量wordlabel与向量有对应关系的word_num_map,代码如下:
# 1.2 样本预处理 training_data = get_ch_label(training_file) print("加载训练模型中") print("该样本长度:",len(training_data)) counter = Counter(training_data) words = sorted(counter) words_size = len(words) word_num_map = dict(zip(words,range(words_size))) print("字表大小:",words_size) wordlabel = get_ch_label_v(training_file,word_num_map) # 加载训练模型中 # 该样本长度: 75 # 字表大小: 41(去重)
上述结果表示样本文件里一共有75个文字,其中掉重复的文字之后,还有41个。这41个文字将作为字表词典,建立文字与索引值的对应关系。
在训练模型时,每个文字都会被转化成数字形式的索引值输入模型。模型的输出是这41个文字的概率,即把每个文字当成一类。
2.2 代码实现:构建循环神经网络模型---make_Language_model.py(第3部分)
使用GRU构建RNN模型,令RNN模型只接收一个序列的喻入字符,并预测出下一个序列的字符。
在该模型里,所需要完成的步骤如下:
1.将输入的字索引转为词嵌入;
2.将词嵌入结果输入GRU层;
3.对GRU结果做全连接处理,得到维度为69的预测结果,这个预测结果代表每个文字的概率。
2.2.1 代码实现
# 1.3 构建循环神经网络(RNN)模型 class GRURNN(torch.nn.Module): def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers): super(GRURNN, self).__init__() self.num_layers = num_layers self.hidden_dim = hidden_dim self.embed = torch.nn.Embedding(word_size,embed_dim) # 定义一个多层的双向层: # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim # 序列状态:形状为[层数×2,批次,维度hidden_dim] self.gru = torch.nn.GRU(input_size=embed_dim, hidden_size=hidden_dim, num_layers=num_layers,bidirectional=True) self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果 def forward(self,features,hidden): embeded = self.embed(features.view(1,-1)) output,hidden = self.gru(embeded.view(1,1,-1),hidden) # output = self.attention(output) output = self.fc(output.view(1,-1)) return output,hidden def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1 init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE) return init_hidden
2.3 代码实现:实例化,训练模型--make_Language_model.py(第3部分)
# 1.4 实例化模型类,并训练模型 EMBEDDING_DIM = 10 # 定义词嵌入维度 HIDDEN_DIM = 20 # 定义隐藏层维度 NUM_LAYERS = 1 # 定义层数 # 实例化模型 model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS) model = model.to(DEVICE) optimizer = torch.optim.Adam(model.parameters(),lr=0.005) # 定义测试函数 def evaluate(model,prime_str,predict_len,temperature=0.8): hidden = model.init_zero_state().to(DEVICE) predicted = "" # 处理输入语义 for p in range(len(prime_str) -1): _,hidden = model(prime_str[p],hidden) predicted = predicted + words[predict_len] inp = prime_str[-1] # 获得输入字符 predicted = predicted + words[inp] #按照指定长度输出预测字符 for p in range(predict_len): output,hidden = model(inp,hidden) # 将输入字符和状态传入模型 # 从多项式中分布采样 # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错 # 同时,使用多项式分布的方式进行采样,生成预测结果 output_dist = output.data.view(-1).div(temperature).exp() inp = torch.multinomial(output_dist,1)[0] # 获取采样结果 predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中 return predicted # 定义参数并训练 training_iters = 5000 display_step = 1000 n_input = 4 step = 0 offset = random.randint(0,n_input+1) end_offset = n_input + 1 while step < training_iters: # 按照迭代次数训练模型 start_time = time.time() # 计算起始时间 #随机取一个位置偏移 if offset > (len(training_data)-end_offset): offset = random.randint(0,n_input+1) # 制作输入样本 inwords = wordlabel[offset:offset+n_input] inwords = np.reshape(np.array(inwords),[n_input,-1,1]) # 制作标签样本 out_onehot = wordlabel[offset+1:offset+n_input+1] hidden = model.init_zero_state() # RNN的状态清零 optimizer.zero_grad() loss = 0.0 inputs = torch.LongTensor(inwords).to(DEVICE) targets = torch.LongTensor(out_onehot).to(DEVICE) for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测 outputs,hidden = model(inputs[c],hidden) loss = loss + F.cross_entropy(outputs,targets[c].view(1)) loss = loss / n_input loss.backward() optimizer.step() # 输出日志 with torch.set_grad_enabled(False): if (step+1)%display_step == 0 : print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min') print(f'step {step + 1}|Loss {loss.item():.2f}\n\n') with torch.no_grad(): print(evaluate(model,inputs,32),'\n') print(50*'=') step = step +1 # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。 offset = offset + (n_input+1) print("Finished!")
2.4 代码实现:运行模型生成句子--make_Language_model.py(第4部分)
# 1.5 运行模型生成句子 while True: prompt = "输入几个文字:" sentence = input(prompt) inputword = sentence.strip() try: inputword = get_ch_label_v(None,word_num_map,inputword) keys = np.reshape(np.array(inputword),[len(inputword),-1,1]) # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值, # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错 model.eval() with torch.no_grad(): sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32) print(sentence) except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词 print("还没学会")
3 代码总览--make_Language_model.py
import numpy as np import torch import torch.nn.functional as F import time import random from collections import Counter # 1.1 定义基本的工具函数 RANDOM_SEED = 123 torch.manual_seed(RANDOM_SEED) DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def elapsed(sec): # 计算时间函数 if sec < 60: return str(sec) + "sec" elif sec<(60*60): return str(sec/60) + "min" else: return str(sec/(60*60)) + "hour" training_file = 'csv_list/wordstest.txt' # 定义样本文件 #中文字 def get_ch_label(txt_file): # 提取样本中的汉字 labels = "" with open(txt_file,'rb') as f : for label in f: labels = labels + label.decode("gb2312",errors = 'ignore') return labels #中文多文件 def readalltxt(txt_files): # 处理中文 labels = [] for txt_file in txt_files: target = get_ch_label(txt_file) labels.append(target) return labels # 将汉子转化成向量,支持文件和内存对象里的汉字转换 def get_ch_label_v(txt_file,word_num_map,txt_label = None): words_size = len(word_num_map) to_num = lambda word:word_num_map.get(word,words_size) if txt_file != None: txt_label = get_ch_label(txt_file) # 使用to_num()实现单个汉子转成向量的功能,如果没有该汉字,则将返回words_size(值为69) labels_vector = list(map(to_num,txt_label)) # 将汉字列表中的每个元素传入到to_num()进行转换 return labels_vector # 1.2 样本预处理 training_data = get_ch_label(training_file) print("加载训练模型中") print("该样本长度:",len(training_data)) counter = Counter(training_data) words = sorted(counter) words_size = len(words) word_num_map = dict(zip(words,range(words_size))) print("字表大小:",words_size) wordlabel = get_ch_label_v(training_file,word_num_map) # 加载训练模型中 # 该样本长度: 75 # 字表大小: 41(去重) # 1.3 构建循环神经网络(RNN)模型 class GRURNN(torch.nn.Module): def __init__(self,word_size,embed_dim,hidden_dim,output_size,num_layers): super(GRURNN, self).__init__() self.num_layers = num_layers self.hidden_dim = hidden_dim self.embed = torch.nn.Embedding(word_size,embed_dim) # 定义一个多层的双向层: # 预测结果:形状为[序列,批次,维度hidden_dim×2],因为是双向RNN,故维度为hidden_dim # 序列状态:形状为[层数×2,批次,维度hidden_dim] self.gru = torch.nn.GRU(input_size=embed_dim, hidden_size=hidden_dim, num_layers=num_layers,bidirectional=True) self.fc = torch.nn.Linear(hidden_dim *2,output_size)# 全连接层,充当模型的输出层,用于对GRU输出的预测结果进行处理,得到最终的分类结果 def forward(self,features,hidden): embeded = self.embed(features.view(1,-1)) output,hidden = self.gru(embeded.view(1,1,-1),hidden) # output = self.attention(output) output = self.fc(output.view(1,-1)) return output,hidden def init_zero_state(self): # 对于GRU层状态的初始化,每次迭代训练之前,需要对GRU的状态进行清空,因为输入的序列是1,故torch.zeros的第二个参数为1 init_hidden = torch.zeros(self.num_layers*2,1,self.hidden_dim).to(DEVICE) return init_hidden # 1.4 实例化模型类,并训练模型 EMBEDDING_DIM = 10 # 定义词嵌入维度 HIDDEN_DIM = 20 # 定义隐藏层维度 NUM_LAYERS = 1 # 定义层数 # 实例化模型 model = GRURNN(words_size,EMBEDDING_DIM,HIDDEN_DIM,words_size,NUM_LAYERS) model = model.to(DEVICE) optimizer = torch.optim.Adam(model.parameters(),lr=0.005) # 定义测试函数 def evaluate(model,prime_str,predict_len,temperature=0.8): hidden = model.init_zero_state().to(DEVICE) predicted = "" # 处理输入语义 for p in range(len(prime_str) -1): _,hidden = model(prime_str[p],hidden) predicted = predicted + words[predict_len] inp = prime_str[-1] # 获得输入字符 predicted = predicted + words[inp] #按照指定长度输出预测字符 for p in range(predict_len): output,hidden = model(inp,hidden) # 将输入字符和状态传入模型 # 从多项式中分布采样 # 在测试环境下,使用温度的参数和指数计算对模型的输出结果进行微调,保证其数值是大于0的数,小于0,torch.multinomial()会报错 # 同时,使用多项式分布的方式进行采样,生成预测结果 output_dist = output.data.view(-1).div(temperature).exp() inp = torch.multinomial(output_dist,1)[0] # 获取采样结果 predicted = predicted + words[inp] # 将索引转化成汉字保存在字符串中 return predicted # 定义参数并训练 training_iters = 5000 display_step = 1000 n_input = 4 step = 0 offset = random.randint(0,n_input+1) end_offset = n_input + 1 while step < training_iters: # 按照迭代次数训练模型 start_time = time.time() # 计算起始时间 #随机取一个位置偏移 if offset > (len(training_data)-end_offset): offset = random.randint(0,n_input+1) # 制作输入样本 inwords = wordlabel[offset:offset+n_input] inwords = np.reshape(np.array(inwords),[n_input,-1,1]) # 制作标签样本 out_onehot = wordlabel[offset+1:offset+n_input+1] hidden = model.init_zero_state() # RNN的状态清零 optimizer.zero_grad() loss = 0.0 inputs = torch.LongTensor(inwords).to(DEVICE) targets = torch.LongTensor(out_onehot).to(DEVICE) for c in range(n_input): # 按照输入长度将样本预测输入模型并进行预测 outputs,hidden = model(inputs[c],hidden) loss = loss + F.cross_entropy(outputs,targets[c].view(1)) loss = loss / n_input loss.backward() optimizer.step() # 输出日志 with torch.set_grad_enabled(False): if (step+1)%display_step == 0 : print(f'Time elapesd:{(time.time() - start_time)/60:.4f}min') print(f'step {step + 1}|Loss {loss.item():.2f}\n\n') with torch.no_grad(): print(evaluate(model,inputs,32),'\n') print(50*'=') step = step +1 # 每次迭代结束,将偏移值相后移动n_input+1个距离单位,可以保证输入数据的样本相互均匀,否则会出现文本两边的样本训练次数较少的情况。 offset = offset + (n_input+1) print("Finished!") # 1.5 运行模型生成句子 while True: prompt = "输入几个文字:" sentence = input(prompt) inputword = sentence.strip() try: inputword = get_ch_label_v(None,word_num_map,inputword) keys = np.reshape(np.array(inputword),[len(inputword),-1,1]) # get_ch_label_v()中,如果在字典中找不到对应的索引,就会为其分配一个无效的索引值, # 进而在 evaluate()函数中调用模型的时,差不多对应对的有效词向量而终止报错 model.eval() with torch.no_grad(): sentence = evaluate(model,torch.LongTensor(keys).to(DEVICE),32) print(sentence) except: # 异常处理,当输入的文字不在模型字典中时,系统会报错,有意设置,防止输入超范围的字词 print("还没学会")
模型结果:训练的并不好,但是还能用