1.viterbi算法的原理
维特比算法是一种动态规划算法,比如在有向无环图中,维特比算法到达每一列都会删除不符合最短路径要求的路径,大大降低了时间复杂度。
算法的思路(和李航的统计学习方法上一样)
输入:模型\(\lambda = (A,B,\pi)\) 和观测\(O = (O_1,O_2,...,O_T)\)
(1)初始化
(2) 递推 对t = 2,3,...,T
\[\sigma_{t}(i) = max(\sigma_{t-1}(j)a_{ji}) ~~1<=j<=N~~~~~ i = 1,2,...N \] \[\phi_{t}(i) = argmax[\sigma_{t-1}(j)a_{ji}] ~ 1<=j<=N \]上式中\(max[\sigma_{t-1}(j)a_{ji}]\)其计算的思路是t-1时刻所有可能的隐藏状态的概率(j表示)通过转移状态矩阵转到i状态的最大值
(3)终止
上式\(i_{T}\)表示在最后一个时间T下取最大值的索引
(4) 最优路径回溯
2.以一个小例子的python实现来说明上述过程
import numpy as np
def viterbi(A,B,pi,o):
"""
:param A: 表示状态转移矩阵
:param B: 表示观测概率矩阵
:param pi: 初始概率
:param o: 观测序列
:return:
"""
N = len(A[0]) # 状态的个数,即有多少个状态
T = len(o) # 观测的时间段有多长
delta = [[0]*N for _ in range(T)] # 用来存储每一时间下,各隐藏状态的概率
psi = [[0]*N for _ in range(T)] # 用来存储每一时间下,取最大值的的那个状态
# init
for i in range(N):
delta[0][i] = pi[i]*B[i][o[0]] # 初始条件下(t=0)的各个状态的概率就等于初始概率乘以观测概率矩阵的第一列下各概率的值
psi[0][i] = 0 # 初始的最大值状态全部设为0
# iter
for t in range(1,T):
for i in range(N):
temp,max_index = 0,0
for j in range(N):
res = delta[t-1][j]*A[j][i]
if res > temp:
temp = res
max_index = j # 保存取最大值时的索引
delta[t][i] = temp*B[i][o[t]]
psi[t][i] = max_index # 每个i循环时都有一个max_index
# end
p = max(delta[-1])
for i in range(N):
if delta[-1][i] == p:
i_T = i
# backtrack
path = [0]*T
i_t = i_T
for t in reversed(range(T-1)):
i_t = psi[t+1][i_t]
path[t] = i_t
path[-1] = i_T
return delta,psi,path
A = [[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]]
B = [[0.5,0.5],[0.4,0.6],[0.7,0.3]]
pi = [0.2,0.4,0.4]
o = [0,1,0]
delta,psi,path = viterbi(A,B,pi,o)
print(np.array(delta))
print(psi)
print(path) # 这里的结果2是索引,表示的就是3
运行的结果为:
[[0.1 0.16 0.28 ]
[0.028 0.0504 0.042 ]
[0.00756 0.01008 0.0147 ]]
[[0, 0, 0], [2, 2, 2], [1, 1, 2]]
[2, 2, 2]
3 以矩阵的形式来表示上述算法的实现过程
# np.multipy表示矩阵的点乘
def viterbi(A,B,pi,o):
A = np.array(A)
B = np.array(B)
pi = np.array(pi).reshape(1,len(pi))
o = np.array(o).reshape(1,len(o))
T = o.shape[1]
N = A.shape[0]
delta = np.zeros((T,N))
psi = np.zeros((T,N))
# 初始
delta[0,:] = np.multiply(pi,B[:,0])
# 迭代
for t in range(1,T):
for i in range(N):
temp = list(np.multiply(delta[t-1,:],A[:,i].T))
delta[t,i] = max(temp) * B[i,o[0,t]]
psi[t,i] = temp.index(max(temp))
# 结束
end = list(delta[-1,:]).index(max(delta[-1,:]))
# 回溯
i_t =end
path = [0]*T
for t in reversed(range(T-1)):
i_t = int(psi[t+1,i_t])
path[t] = i_t
path[-1] = end
return delta,psi,path
A = [[0.5,0.2,0.3],[0.3,0.5,0.2],[0.2,0.3,0.5]]
B = [[0.5,0.5],[0.4,0.6],[0.7,0.3]]
pi = [0.2,0.4,0.4]
o = [0,1,0]
delta,psi,path = viterbi(A,B,pi,o)
print(np.array(delta))
print(psi)
print(path) # 这里的结果2是索引,表示的就是3
4 利用维特比算法来解码,这里参考了jieba分词中自带的转移状态矩阵和观测矩阵(发射矩阵)
"""
start:初始概率分布,就是第一个字的状态的概率
tran: 状态转移概率,从当前状态到下一个状态的转移的概率
emit: 发射概率,表示在某一状态下生成某个观测状态的概率
"""
import sys
import re
import getopt
sys.path.append('.\\finalseg') # 将同级目录finalseg加入到路径中
MIN_FLOAT = -3.14e100
prob_start_p = "prob_start.p"
prob_trans_p = "prob_trans.p"
prob_emit_p = "prob_emit.p"
# 某一个词的状态为key时,prevstatus表示前一个词的状态的框定范围
PrevStatus = {
'B':'ES',
'M':'MB',
'S':'SE',
'E':'BM'
}
Force_Split_Words = set([])
from prob_start import P as start_p
from prob_trans import P as trans_p
from prob_emit import P as emit_p
def viterbi(obs,states,start_p,trans_p,emit_p):
V = [{}] # 这里的v就相当于原始的delta
path = {}
for y in states: # init 获取这一句子的初始状态分布,可以理解为第一个字为上述4种状态的概率
V[0][y] = start_p[y] + emit_p[y].get(obs[0],MIN_FLOAT) # 概率相乘转变为对数相加
path[y] = [y]
# 对之后每一个字做状态转移概率的分析
for t in range(1,len(obs)):
V.append({})
newpath = {}
# 考察当前字对上一字的发射概率,取其中最大的那个
for y in states:
# 获取当前词在y状态下的发射概率
em_p = emit_p[y].get(obs[t],MIN_FLOAT) #这个就相当于B[i][o[t]]
# state表示前一个字到当前字的y状态的最大概率,prob表示这个概率
(prob,state) = max(
[
(V[t-1][y0] + trans_p[y0].get(y,MIN_FLOAT) + em_p ,y0)
for y0 in PrevStatus[y]
]
)
# 得到当前字的所有可能观测状态的最大概率值
V[t][y] = prob
# 更新路径,state表示当前字的前一个字到当前字的y状态的最大可能概率,所以是path[state]
newpath[y] = path[state] + [y]
path = newpath
# 最后一个要重新复盘,因为最后一个字智能是E or S
(prob,state) = max((V[len(obs) - 1][y],y) for y in 'ES')
for i in path:
print((i,path[i]))
for v_i in V:
print((V))
return (prob,path[state])
def _cut(sentence): # 这个应该是解码
prob,pos_list = viterbi(sentence,'BMES',start_p,trans_p,emit_p)
begin,nexti = 0,0
for i,char in enumerate(sentence):
pos = pos_list[i]
if pos == 'B':
begin = i
elif pos == 'E':
yield sentence[begin:i+1]
nexti = i+1
elif pos == 'S':
yield char
nexti = i+1
if nexti < len(sentence):
yield sentence[nexti:]
re_han = re.compile("([\u4E00-\u9FD5]+)") # 匹配所有中文
re_skip = re.compile("([a-zA-Z0-9]+(?:\.\d+)?%?)")
def cut(sentence):
sentence = sentence.strip()
blocks = re_han.split(sentence)
lseg = []
for blk in blocks:
if re_han.match(blk):
for word in _cut(blk):
if word not in Force_Split_Words:
lseg.append(word)
else:
for c in word:
lseg.append(c)
else:
tmp = re_skip.split(blk)
for x in tmp:
if x:
lseg.append(x)
return lseg
if __name__=='__main__':
sen = "航天2院706所的全称是北京计算机技术及应用研究所"
rs = cut(sen)
print(" ".join(rs))
运行结果为:
航天 2 院 706 所 的 全称 是 北京 计算机 技术 及 应用 研究 所