一、基于Bi-LSTM与CRF实现中文命名实体识别任务
命名实体识别是指识别文本中具有特定意义的实体,主要包括人名、地名、机构名、专有名词等。本项目实现一个简单的命名实体识别方法,该方法通过BiLSTM+CRF模型预测出文本中文字所对应的标签,再根据标签提取出文本中的实体。
二、数据集介绍
本数据集共包含约2.7万中文文本,其中包括约2.08万训练集,0.23万验证集和0.46万测试集。数据集分别命名为example.train,example.dev,example.test,保存在datasets目录下。
- 1.训练集:包含文本和对应的标签,用于模型训练。
- 2.验证集:包含文本和对应的标签,用于模型训练和参数调试。
- 3.测试集:包含文本和对应的标签,用于预测结果、验证效果。
数据集中标注有三种实体,分别为人名、地名、机构名,标注集合为{‘O’,‘B-PER’,‘I-PER’,‘B-ORG’,‘I-ORG’,‘B-LOC’,‘I-LOC’}。
其中’O’表示非实体,'B-'表示实体的首字,'I-'表示实体的其他位置的字,'PER’表示人名,'ORG’表示机构名,'LOC’表示地名。IOB(Inside–outside–beginning)是用于标记标志的通用标记格式。
三、BiLSTM+CRF模型介绍
首先,句中的每个单词是一条包含词嵌入和字嵌入的词向量,词嵌入通常是事先训练好的,字嵌入则是随机初始化的。所有的嵌入都会随着训练的迭代过程被调整。其次,BiLSTM-CRF的输入是词嵌入向量,输出是每个单词对应的预测标签。即使没有CRF层,我们照样可以训练一个基于BiLSTM的命名实体识别模型。但是CRF层可以加入一些约束来保证最终预测结果是有效的。这些约束可以在训练数据时被CRF层自动学习得到。
可能的约束条件有:
- 句子的开头应该是“B-”或“O”,而不是“I-”。
- “B-label1 I-label2 I-label3…”,在该模式中,类别1,2,3应该是同一种实体类别。比如,“B-Person I-Person” 是正确的,而“B-Person I-Organization”则是错误的。
- “O I-label”是错误的,命名实体的开头应该是“B-”而不是“I-”。
有了这些有用的约束,错误的预测序列将会大大减少。
四、定义Bi-LSTM网络
def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim): super(BiLSTM_CRF, self).__init__() self.vocab_size = vocab_size self.tag_to_ix = tag_to_ix self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.tagset_size = len(tag_to_ix) self.word_embeds = nn.Embedding(self.vocab_size + 1, self.embedding_dim) self.BiLSTM = nn.LSTM(embedding_dim, hidden_size=hidden_dim//2, bidirectional=True, num_layers=1) self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size) self.transitions = nn.Parameter(torch.randn([self.tagset_size, self.tagset_size])) self.hidden = self.init_hidden()
五、定义CRF层
def neg_log_likelihood(self, sentence, tags): feats = self._get_lstm_features(sentence) forward_score = self._forward_alg(feats) gold_score = self._score_sentence(feats, tags) return forward_score - gold_score
六、优化思路
- 统计学习方法之间或内部层叠融合。
- 规则、词典和机器学习方法之间的融合,其核心是融合方法技术。
在基于统计的学习方法中引入部分规则,将机器学习和人工知识结合起来。 - 将各类模型、算法结合起来,将前一级模型的结果作为下一级的训练数据,并用这些训练数据对模型进行训练,得到下一级模型。
这种方法在具体实现过程中需要考虑怎样高效地将两种方法结合起来,采用什么样的融合技术。由于命名实体识别在很大程度上依赖于分类技术,在分类方面可以采用的融合技术主要包括如Voting, Grading等。
完整源码
import pickle import numpy as np import pandas as pd import torch import torch.nn as nn from keras.preprocessing.sequence import pad_sequences from sklearn.model_selection import train_test_split from torch.utils.data import TensorDataset from torch import optim from torchnet import meter from tqdm import tqdm num_layers = 1 # LSTM的层数 hidden_dim = 100 # LSTM中的隐层大小 epochs = 50 # 迭代次数 batch_size = 32 # 每个批次样本大小 embedding_dim = 15 # 每个字形成的嵌入向量大小 lr = 0.01 # 学习率 device = 'cpu' # 加载文本数据 def load_data(): def get_vocab_list(path_list): vocab_set = set() vocab_list = list() for path in path_list: with open(path,'r',encoding = 'utf-8') as f: for line in f: if len(line.strip()) == 0: continue; if line[0] not in vocab_set: vocab_set.add(line[0]) vocab_list.append(line[0]) return vocab_list def save_vocab(path,vocab_list): output = ''.join([vocab+'\n' for vocab in vocab_list]) with open(path,'w',encoding = 'utf-8') as f: f.write(output) def get_string2id(path): string2id = {} id2string = [] with open(path,'r',encoding = 'utf-8') as f: for line in f: string2id[line.strip()] = len(string2id) id2string.append(line.strip()) return id2string,string2id def get_sequence_len(path): sequence_len = list() tmp_len = 0 with open(path,'r',encoding = 'utf-8') as f: for line in f: line = line.strip() if len(line) == 0: #句子的间隔,说明前一个句子已经结束,长度为tmp_len sequence_len.append(tmp_len) tmp_len = 0 else: tmp_len += 1 return np.array(sequence_len) def read_data(path,vocab2id,label2id,max_len): data_x = list() data_y = list() tmp_text = list() tmp_label = list() with open(path,'r',encoding = 'utf-8') as f: for line in f: line = line.strip() if len(line) == 0: tmp_text += [len(vocab2id)] * (max_len - len(tmp_text)) tmp_label += [0] * (max_len - len(tmp_label)) data_x.append(tmp_text) data_y.append(tmp_label) tmp_text = list() tmp_label = list() else: tmp_text.append(vocab2id[line[0]]) tmp_label.append(label2id[line[2:]]) print(u'{} include sequences {}'.format(path,len(data_x))) return np.array(data_x),np.array(data_y) vocab_list = get_vocab_list(['./datasets/example.train', './datasets/example.dev','./datasets/example.test']) save_vocab('./vocab.txt',vocab_list) id2label,label2id = get_string2id('./datasets/labels.txt') id2word,word2id = get_string2id('./vocab.txt') print(len(label2id),len(word2id)) train_sequence_len = get_sequence_len('./datasets/example.train') dev_sequence_len = get_sequence_len('./datasets/example.dev') test_sequence_len = get_sequence_len('./datasets/example.test') max_len = max(max(train_sequence_len),max(dev_sequence_len),max(test_sequence_len)) print(max_len) train_text,train_label = read_data('./datasets/example.train',word2id,label2id,max_len) dev_text,dev_label = read_data('./datasets/example.dev',word2id,label2id,max_len) test_text,test_label = read_data('./datasets/example.test',word2id,label2id,max_len) return train_text, train_label, test_text, test_label, id2label, id2word # 1.加载数据集 train_text, train_label, test_text, test_label, id2label, id2word = load_data() print('训练集', train_text.shape, train_label.shape) # 2.将numpy转成tensor x_train = torch.from_numpy(train_text).to(torch.long) y_train = torch.from_numpy(train_label).to(torch.long) x_test = torch.from_numpy(test_text).to(torch.long) y_test = torch.from_numpy(test_label).to(torch.long) # x_train = x_train[:32] # y_train = y_train[:32] # 3.形成训练数据集 train_data = TensorDataset(x_train, y_train) test_data = TensorDataset(x_test, y_test) # 4.将数据加载成迭代器 train_loader = torch.utils.data.DataLoader(train_data, batch_size, True) test_loader = torch.utils.data.DataLoader(test_data, batch_size, False) START_TAG = "O" STOP_TAG = "O" # 5.定义相关函数 def argmax(vec): _, idx = torch.max(vec, 1) return idx.item() def prepare_sequence(seq, to_ix): idxs = [to_ix[word] for word in seq] return torch.tensor(idxs, dtype=torch.long) def log_sum_exp(vec): max_score = vec[0, argmax(vec)] max_score_broadcast = max_score.view([1, -1]).expand([1, vec.size()[1]]) return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast))) # 6.定义模型 class BiLSTM_CRF(nn.Module): def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim): super(BiLSTM_CRF, self).__init__() self.vocab_size = vocab_size self.tag_to_ix = tag_to_ix self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.tagset_size = len(tag_to_ix) self.word_embeds = nn.Embedding(self.vocab_size + 1, self.embedding_dim) self.BiLSTM = nn.LSTM(embedding_dim, hidden_size=hidden_dim//2, bidirectional=True, num_layers=1) self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size) self.transitions = nn.Parameter(torch.randn([self.tagset_size, self.tagset_size])) self.hidden = self.init_hidden() def init_hidden(self): # [num_layers * num_directions, batch, hidden_size] return (torch.randn([2, 1, self.hidden_dim//2]), torch.randn([2, 1, self.hidden_dim//2])) def _get_lstm_features(self, sentence): self.hidden = self.init_hidden() embeds = self.word_embeds(sentence).view([len(sentence), 1, -1]) BiLSTM_out, self.hidden = self.BiLSTM(embeds, self.hidden) BiLSTM_out = BiLSTM_out.view([len(sentence), self.hidden_dim]) BiLSTM_feats = self.hidden2tag(BiLSTM_out) return BiLSTM_feats def _score_sentence(self, feats, tags): score = torch.zeros([1]) tags = torch.cat([torch.tensor([self.tag_to_ix[START_TAG]], dtype=torch.long), tags]) for i, feat in enumerate(feats): score = score + self.transitions[tags[i+1], tags[i]] + feat[tags[i+1]] score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]] return score def _forward_alg(self, feats): forward_var = torch.full([1, self.tagset_size], -10000.) for feat in feats: forward_var_t = [] for next_tag in range(self.tagset_size): emit_score = feat[next_tag].view([1, -1]).expand([1, self.tagset_size]) trans_score = self.transitions[next_tag].view([1, -1]) next_tag_var = forward_var + trans_score + emit_score forward_var_t.append(log_sum_exp(next_tag_var).view([1])) forward_var = torch.cat(forward_var_t).view([1, -1]) forward_var = log_sum_exp(forward_var) return forward_var def neg_log_likelihood(self, sentence, tags): feats = self._get_lstm_features(sentence) forward_score = self._forward_alg(feats) gold_score = self._score_sentence(feats, tags) return forward_score - gold_score def _viterbi_decode(self, feats): forward_var = torch.full([1, self.tagset_size], -10000.) forward_var[0][self.tag_to_ix[START_TAG]] = 0 backpointers = [] for feat in feats: backpointers_t = [] forward_var_t = [] for next_tag in range(self.tagset_size): next_tag_var = forward_var + self.transitions[next_tag] best_tag_id = argmax(next_tag_var) backpointers_t.append(best_tag_id) forward_var_t.append(next_tag_var[0][best_tag_id].view([1])) forward_var = (torch.cat(forward_var_t) + feat).view([1, -1]) backpointers.append(backpointers_t) forward_var += self.transitions[self.tag_to_ix[STOP_TAG]] best_tag_id = argmax(forward_var) path_score = forward_var[0][best_tag_id] best_path = [best_tag_id] for backpointers_t in reversed(backpointers): best_tag_id = backpointers_t[best_tag_id] best_path.append(best_tag_id) start = best_path.pop() assert self.tag_to_ix[START_TAG] == start best_path.reverse() return path_score, best_path def forward(self, sentence): BiLSTM_feats = self._get_lstm_features(sentence) score, best_path = self._viterbi_decode(BiLSTM_feats) return score, best_path # 7.模型训练 model = BiLSTM_CRF(vocab_size=len(id2word), tag_to_ix=label2id, embedding_dim=embedding_dim, hidden_dim=hidden_dim) optimizer = optim.Adam(model.parameters(), lr=lr) for epoch in range(10): test_data = tqdm(test_data) for sentence, tags in test_data: model.zero_grad() targets = torch.tensor([tags[t] for t in tags], dtype=torch.long) loss = model.neg_log_likelihood(sentence, targets) loss.backward() optimizer.step() print("【EPOCH: 】%s" % str(epoch + 1)) print("训练损失为%s" % (str(loss.item())))