【自然语言处理】hmm隐马尔可夫模型进行中文分词 代码

本文摘要
· 理论来源:【统计自然语言处理】第七章 自动分词;【统计学习方法】第十章 隐马尔可夫模型
· 代码目的:手写HMM进行中文分词
作者:CSDN 征途黯然.

一、数据集

  数据集的形式如下:

新	B
华	M
社	E
北	B
京	E
二	B
月	E
十	B
二	M
日	E
电	S

中	B
国	E
十	B
四	E
……

  数据集已经标注好了四种状态(B、M、E、S),每个句子之间用换行分割。

  获取本数据集或者代码工程,可以关注公众号‘三黄工作室’回复‘中文分词’。

二、代码介绍

  1. 定义一个类,HmmModel

  2. 类中定义属性,分词状态self.STATE,状态转移矩阵self.A_dict,发射矩阵self.B_dict,初始矩阵self.Pi_dict

  3. 类中函数load,先加载序列化的中间模型,如果中间模型数据不存在,则去加载语料库,重新训练,训练好了之后,把中间数据序列化保存成.pkl文件。
  这里的中间模型,指的是状态转移矩阵self.A_dict,发射矩阵self.B_dict,初始矩阵self.Pi_dict这3个矩阵的数据。
  把中间数据序列化保存成.pkl文件,需要调用第4步的save方法。

  4. 类中函数save,保存状态转移矩阵self.A_dict,发射矩阵self.B_dict,初始矩阵self.Pi_dict这3个矩阵的数据到.pkl文件。

  5. 类中函数viterbi,维特比算法。根据输入的句子text,进行索引,返回最优的状态序列。

  6. 类中函数cut,把维特比算法中返回最优的状态序列进行识别切分。

三、代码

import pickle


class HmmModel:

    def __init__(self):
        # 分词状态
        self.STATE = {'B', 'M', 'E', 'S'}
        # 状态转移矩阵
        self.A_dict = {}
        # 发射矩阵
        self.B_dict = {}
        # 初始矩阵
        self.Pi_dict = {}

    # 加载数据 先加载模型数据,没有就读取语料库重新训练
    def load(self, model_file='../dataset/hmm/model.pkl', train_file='../dataset/hmm/train.txt'):

        # 加载模型数据
        try:
            with open(model_file, 'rb') as f:
                self.A_dict = pickle.load(f)
                self.B_dict = pickle.load(f)
                self.Pi_dict = pickle.load(f)
                return
        except FileNotFoundError:
            pass

        # 统计状态出现次数 方便求发射矩阵
        Count_dict = {}
        # 存放初始语料所有数据
        data = []
        # 存放初始语料中的一个句子
        sentence = []

        # 初始化模型参数
        def init_params():
            for state in self.STATE:
                self.A_dict[state] = {s: 0.0 for s in self.STATE}
                self.Pi_dict[state] = 0.0
                self.B_dict[state] = {}
                Count_dict[state] = 0

        init_params()

        # 读取语料库
        with open(train_file, encoding='utf8') as f:
            # 每句按元组存在data中
            for line in f:
                line = line.strip()
                word_list = [i for i in line if i != '\t']
                if not line:
                    data.append(sentence)
                    sentence = []
                else:
                    sentence.append((word_list[0], word_list[1]))

            # 统计次数
            for s in data:
                for k, v in enumerate(s):
                    Count_dict[v[1]] += 1
                    if k == 0:
                        self.Pi_dict[v[1]] += 1  # 每个句子的第一个字的状态,用于计算初始状态概率
                    else:
                        self.A_dict[s[k - 1][1]][v[1]] += 1  # 计算转移概率
                        self.B_dict[s[k][1]][v[0]] = self.B_dict[s[k][1]].get(v[0], 0) + 1.0  # 计算发射概率

            # 计算频率
            self.Pi_dict = {k: v * 1.0 / len(data) for k, v in self.Pi_dict.items()}
            self.A_dict = {k: {k1: v1 / Count_dict[k] for k1, v1 in v.items()} for k, v in self.A_dict.items()}
            # 加1平滑
            self.B_dict = {k: {k1: (v1 + 1) / Count_dict[k] for k1, v1 in v.items()} for k, v in self.B_dict.items()}

            # 把中间模型数据保存下来
            self.save()

    # 保存中间模型数据
    def save(self, model_file='../dataset/hmm/model.pkl'):
        # 序列化
        import pickle
        with open(model_file, 'wb') as f:
            pickle.dump(self.A_dict, f)
            pickle.dump(self.B_dict, f)
            pickle.dump(self.Pi_dict, f)

    # 维特比算法
    def viterbi(self, text):
        # 加载数据
        self.load()
        # 赋别名
        states, start_p, trans_p, emit_p = self.STATE, self.Pi_dict, self.A_dict, self.B_dict
        # 初始化顶点集、路径集
        V = [{}]
        path = {}
        # 初始化第一个状态
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]

        # 遍历剩下的状态
        for t in range(1, len(text)):
            V.append({})
            newpath = {}

            # 检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emit_p['S'].keys() and \
                        text[t] not in emit_p['M'].keys() and \
                        text[t] not in emit_p['E'].keys() and \
                        text[t] not in emit_p['B'].keys()

            for y in states:
                # 生词值为1,发射矩阵一行内词找不到为0(发射矩阵有4行)
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0  # 设置未知字单独成词

                # 在当前状态为y下,计算前一个时刻的四种状态的代价乘积,取max
                (prob, state) = max(
                    [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
                      emitP, y0)
                     for y0 in states if V[t - 1][y0] > 0])

                V[t][y] = prob

                newpath[y] = path[state] + [y]
            path = newpath

        if emit_p['M'].get(text[-1], 0) > emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E', 'M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])

        return (prob, path[state])

    def cut(self, text):
        prob, pos_list = self.viterbi(text)
        begin, next = 0, 0
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i + 1]
                next = i + 1
            elif pos == 'S':
                yield char
                next = i + 1
        if next < len(text):
            yield text[next:]



hmm = HmmModel()
text = '人类社会前进的航船就要驶入21世纪的新航程。'
res = hmm.cut(text)
print(str(list(res)))


  测试结果:

['人类', '社会', '前进', '的', '航船', '就', '要', '驶入', '21', '世纪', '的', '新', '航程', '。']

获取本项目的源代码

如果需要本组件的源代码,请扫描关注我的公众号,回复“中文分词”即可。
【自然语言处理】hmm隐马尔可夫模型进行中文分词 代码

上一篇:HMM隐马尔可夫模型进行中文文本分词


下一篇:【史诗级干货长文】HMM模型