本文摘要 · 理论来源:【统计自然语言处理】第七章 自动分词;【统计学习方法】第十章 隐马尔可夫模型
· 代码目的:手写HMM进行中文分词
作者:CSDN 征途黯然.
一、数据集
数据集的形式如下:
新 B
华 M
社 E
北 B
京 E
二 B
月 E
十 B
二 M
日 E
电 S
中 B
国 E
十 B
四 E
……
数据集已经标注好了四种状态(B、M、E、S),每个句子之间用换行分割。
获取本数据集或者代码工程,可以关注公众号‘三黄工作室’回复‘中文分词’。
二、代码介绍
1. 定义一个类,HmmModel
。
2. 类中定义属性,分词状态self.STATE
,状态转移矩阵self.A_dict
,发射矩阵self.B_dict
,初始矩阵self.Pi_dict
。
3. 类中函数load
,先加载序列化的中间模型,如果中间模型数据不存在,则去加载语料库,重新训练,训练好了之后,把中间数据序列化保存成.pkl
文件。
这里的中间模型,指的是状态转移矩阵self.A_dict
,发射矩阵self.B_dict
,初始矩阵self.Pi_dict
这3个矩阵的数据。
把中间数据序列化保存成.pkl
文件,需要调用第4步的save
方法。
4. 类中函数save
,保存状态转移矩阵self.A_dict
,发射矩阵self.B_dict
,初始矩阵self.Pi_dict
这3个矩阵的数据到.pkl
文件。
5. 类中函数viterbi
,维特比算法。根据输入的句子text
,进行索引,返回最优的状态序列。
6. 类中函数cut
,把维特比算法中返回最优的状态序列进行识别切分。
三、代码
import pickle
class HmmModel:
def __init__(self):
# 分词状态
self.STATE = {'B', 'M', 'E', 'S'}
# 状态转移矩阵
self.A_dict = {}
# 发射矩阵
self.B_dict = {}
# 初始矩阵
self.Pi_dict = {}
# 加载数据 先加载模型数据,没有就读取语料库重新训练
def load(self, model_file='../dataset/hmm/model.pkl', train_file='../dataset/hmm/train.txt'):
# 加载模型数据
try:
with open(model_file, 'rb') as f:
self.A_dict = pickle.load(f)
self.B_dict = pickle.load(f)
self.Pi_dict = pickle.load(f)
return
except FileNotFoundError:
pass
# 统计状态出现次数 方便求发射矩阵
Count_dict = {}
# 存放初始语料所有数据
data = []
# 存放初始语料中的一个句子
sentence = []
# 初始化模型参数
def init_params():
for state in self.STATE:
self.A_dict[state] = {s: 0.0 for s in self.STATE}
self.Pi_dict[state] = 0.0
self.B_dict[state] = {}
Count_dict[state] = 0
init_params()
# 读取语料库
with open(train_file, encoding='utf8') as f:
# 每句按元组存在data中
for line in f:
line = line.strip()
word_list = [i for i in line if i != '\t']
if not line:
data.append(sentence)
sentence = []
else:
sentence.append((word_list[0], word_list[1]))
# 统计次数
for s in data:
for k, v in enumerate(s):
Count_dict[v[1]] += 1
if k == 0:
self.Pi_dict[v[1]] += 1 # 每个句子的第一个字的状态,用于计算初始状态概率
else:
self.A_dict[s[k - 1][1]][v[1]] += 1 # 计算转移概率
self.B_dict[s[k][1]][v[0]] = self.B_dict[s[k][1]].get(v[0], 0) + 1.0 # 计算发射概率
# 计算频率
self.Pi_dict = {k: v * 1.0 / len(data) for k, v in self.Pi_dict.items()}
self.A_dict = {k: {k1: v1 / Count_dict[k] for k1, v1 in v.items()} for k, v in self.A_dict.items()}
# 加1平滑
self.B_dict = {k: {k1: (v1 + 1) / Count_dict[k] for k1, v1 in v.items()} for k, v in self.B_dict.items()}
# 把中间模型数据保存下来
self.save()
# 保存中间模型数据
def save(self, model_file='../dataset/hmm/model.pkl'):
# 序列化
import pickle
with open(model_file, 'wb') as f:
pickle.dump(self.A_dict, f)
pickle.dump(self.B_dict, f)
pickle.dump(self.Pi_dict, f)
# 维特比算法
def viterbi(self, text):
# 加载数据
self.load()
# 赋别名
states, start_p, trans_p, emit_p = self.STATE, self.Pi_dict, self.A_dict, self.B_dict
# 初始化顶点集、路径集
V = [{}]
path = {}
# 初始化第一个状态
for y in states:
V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
path[y] = [y]
# 遍历剩下的状态
for t in range(1, len(text)):
V.append({})
newpath = {}
# 检验训练的发射概率矩阵中是否有该字
neverSeen = text[t] not in emit_p['S'].keys() and \
text[t] not in emit_p['M'].keys() and \
text[t] not in emit_p['E'].keys() and \
text[t] not in emit_p['B'].keys()
for y in states:
# 生词值为1,发射矩阵一行内词找不到为0(发射矩阵有4行)
emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 # 设置未知字单独成词
# 在当前状态为y下,计算前一个时刻的四种状态的代价乘积,取max
(prob, state) = max(
[(V[t - 1][y0] * trans_p[y0].get(y, 0) *
emitP, y0)
for y0 in states if V[t - 1][y0] > 0])
V[t][y] = prob
newpath[y] = path[state] + [y]
path = newpath
if emit_p['M'].get(text[-1], 0) > emit_p['S'].get(text[-1], 0):
(prob, state) = max([(V[len(text) - 1][y], y) for y in ('E', 'M')])
else:
(prob, state) = max([(V[len(text) - 1][y], y) for y in states])
return (prob, path[state])
def cut(self, text):
prob, pos_list = self.viterbi(text)
begin, next = 0, 0
for i, char in enumerate(text):
pos = pos_list[i]
if pos == 'B':
begin = i
elif pos == 'E':
yield text[begin: i + 1]
next = i + 1
elif pos == 'S':
yield char
next = i + 1
if next < len(text):
yield text[next:]
hmm = HmmModel()
text = '人类社会前进的航船就要驶入21世纪的新航程。'
res = hmm.cut(text)
print(str(list(res)))
测试结果:
['人类', '社会', '前进', '的', '航船', '就', '要', '驶入', '21', '世纪', '的', '新', '航程', '。']
获取本项目的源代码
如果需要本组件的源代码,请扫描关注我的公众号,回复“中文分词”即可。