本文摘要·
理论来源:
【统计自然语言处理】第七章 自动分词;【统计学习方法】第十章 隐马尔可夫模型
· 代码目的:手写HMM进行中文分词
作者:CSDN 征途黯然.
一、数据集
数据集的形式如下:
新 B 华 M 社 E 北 B 京 E 二 B 月 E 十 B 二 M 日 E 电 S 中 B 国 E 十 B 四 E ……
数据集已经标注好了四种状态(B、M、E、S),每个句子之间用换行分割。
获取本数据集或者代码工程,可以关注公众号‘三黄工作室’回复‘中文分词’。
二、代码介绍
1. 定义一个类,HmmModel
。
2. 类中定义属性,分词状态self.STATE
,状态转移矩阵self.A_dict
,发射矩阵self.B_dict
,初始矩阵self.Pi_dict
。
3. 类中函数load,先加载序列化的中间模型,如果中间模型数据不存在,则去加载语料库,重新训练,训练好了之后,把中间数据序列化保存成.pkl文件。
这里的中间模型,指的是状态转移矩阵self.A_dict,发射矩阵self.B_dict,初始矩阵self.Pi_dict这3个矩阵的数据。
把中间数据序列化保存成.pkl文件,需要调用第4步的save方法。
4. 类中函数save,保存状态转移矩阵self.A_dict,发射矩阵self.B_dict,初始矩阵self.Pi_dict这3个矩阵的数据到.pkl文件。
5. 类中函数viterbi,维特比算法。根据输入的句子text,进行索引,返回最优的状态序列。
6. 类中函数cut,把维特比算法中返回最优的状态序列进行识别切分。
三、代码
import pickle class HmmModel: def __init__(self): # 分词状态 self.STATE = {'B', 'M', 'E', 'S'} # 状态转移矩阵 self.A_dict = {} # 发射矩阵 self.B_dict = {} # 初始矩阵 self.Pi_dict = {} # 加载数据 先加载模型数据,没有就读取语料库重新训练 def load(self, model_file='../dataset/hmm/model.pkl', train_file='../dataset/hmm/train.txt'): # 加载模型数据 try: with open(model_file, 'rb') as f: self.A_dict = pickle.load(f) self.B_dict = pickle.load(f) self.Pi_dict = pickle.load(f) return except FileNotFoundError: pass # 统计状态出现次数 方便求发射矩阵 Count_dict = {} # 存放初始语料所有数据 data = [] # 存放初始语料中的一个句子 sentence = [] # 初始化模型参数 def init_params(): for state in self.STATE: self.A_dict[state] = {s: 0.0 for s in self.STATE} self.Pi_dict[state] = 0.0 self.B_dict[state] = {} Count_dict[state] = 0 init_params() # 读取语料库 with open(train_file, encoding='utf8') as f: # 每句按元组存在data中 for line in f: line = line.strip() word_list = [i for i in line if i != '\t'] if not line: data.append(sentence) sentence = [] else: sentence.append((word_list[0], word_list[1])) # 统计次数 for s in data: for k, v in enumerate(s): Count_dict[v[1]] += 1 if k == 0: self.Pi_dict[v[1]] += 1 # 每个句子的第一个字的状态,用于计算初始状态概率 else: self.A_dict[s[k - 1][1]][v[1]] += 1 # 计算转移概率 self.B_dict[s[k][1]][v[0]] = self.B_dict[s[k][1]].get(v[0], 0) + 1.0 # 计算发射概率 # 计算频率 self.Pi_dict = {k: v * 1.0 / len(data) for k, v in self.Pi_dict.items()} self.A_dict = {k: {k1: v1 / Count_dict[k] for k1, v1 in v.items()} for k, v in self.A_dict.items()} # 加1平滑 self.B_dict = {k: {k1: (v1 + 1) / Count_dict[k] for k1, v1 in v.items()} for k, v in self.B_dict.items()} # 把中间模型数据保存下来 self.save() # 保存中间模型数据 def save(self, model_file='../dataset/hmm/model.pkl'): # 序列化 import pickle with open(model_file, 'wb') as f: pickle.dump(self.A_dict, f) pickle.dump(self.B_dict, f) pickle.dump(self.Pi_dict, f) # 维特比算法 def viterbi(self, text): # 加载数据 self.load() # 赋别名 states, start_p, trans_p, emit_p = self.STATE, self.Pi_dict, self.A_dict, self.B_dict # 初始化顶点集、路径集 V = [{}] path = {} # 初始化第一个状态 for y in states: V[0][y] = start_p[y] * emit_p[y].get(text[0], 0) path[y] = [y] # 遍历剩下的状态 for t in range(1, len(text)): V.append({}) newpath = {} # 检验训练的发射概率矩阵中是否有该字 neverSeen = text[t] not in emit_p['S'].keys() and \ text[t] not in emit_p['M'].keys() and \ text[t] not in emit_p['E'].keys() and \ text[t] not in emit_p['B'].keys() for y in states: # 生词值为1,发射矩阵一行内词找不到为0(发射矩阵有4行) emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 # 设置未知字单独成词 # 在当前状态为y下,计算前一个时刻的四种状态的代价乘积,取max (prob, state) = max( [(V[t - 1][y0] * trans_p[y0].get(y, 0) * emitP, y0) for y0 in states if V[t - 1][y0] > 0]) V[t][y] = prob newpath[y] = path[state] + [y] path = newpath if emit_p['M'].get(text[-1], 0) > emit_p['S'].get(text[-1], 0): (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E', 'M')]) else: (prob, state) = max([(V[len(text) - 1][y], y) for y in states]) return (prob, path[state]) def cut(self, text): prob, pos_list = self.viterbi(text) begin, next = 0, 0 for i, char in enumerate(text): pos = pos_list[i] if pos == 'B': begin = i elif pos == 'E': yield text[begin: i + 1] next = i + 1 elif pos == 'S': yield char next = i + 1 if next < len(text): yield text[next:] hmm = HmmModel() text = '人类社会前进的航船就要驶入21世纪的新航程。' res = hmm.cut(text) print(str(list(res)))
测试结果:
['人类', '社会', '前进', '的', '航船', '就', '要', '驶入', '21', '世纪', '的', '新', '航程