一、准备数据
1.seq_example代表问题,seq_answer代表答案,数据内容如下所示:
seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]
2.将数据进行jieba分词并加入索引index,其中SOS代表单词开头,EOS代表单词结尾,PAD补全,数据如下:
{'你': 3, '认识': 4, '我': 5, '吗': 6, '住': 7, '在': 8, '哪里': 9, '知道': 10, '的': 11, '名字': 12, '是': 13, '谁': 14, '会': 15, '唱歌': 16, '有': 17, '父母': 18, '当然': 19, '成都': 20, '不': 21, '机器人': 22, '不会': 23, '没有': 24, 'PAD': 0, 'SOS': 1, 'EOS': 2}
3. 最后将seq_example与seq_answer分词后使用索引表示
二、模型构建
1.encoder
采用双向LSTM处理输入向量,代码如下:
class lstm_encoder(nn.Module):
def __init__(self):
super(lstm_encoder, self).__init__()
# 双向LSTM
self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)
def forward(self, embedding_input):
encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
# 拼接前向和后向最后一个隐层
encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)
2.decoder + Attention
decoder采用单向LSTM并加入Attention机制,即将decoder输出与encoder输出通过Atention拼接后进入全连接层做预测,Attention机制采用的General方式,具体过程如下所示:
代码如下:
class lstm_decoder(nn.Module):
def __init__(self):
super(lstm_decoder, self).__init__()
# 单向LSTM
self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
# attention参数
self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
# attention_joint参数
self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
# 定义全连接层
self.fc = nn.Linear(n_hidden * 2, num_classes)
def forward(self, input_x, encoder_output, hn, cn):
decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
decoder_output = decoder_output.permute(1, 0, 2)
encoder_output = encoder_output.permute(1, 0, 2)
decoder_output_att = self.att_weight(encoder_output)
decoder_output_att = decoder_output_att.permute(0, 2, 1)
# 计算分数score
decoder_output_score = decoder_output.bmm(decoder_output_att)
# 计算权重at
at = nn.functional.softmax(decoder_output_score, dim=2)
# 计算新的context向量ct
ct = at.bmm(encoder_output)
# 拼接ct和decoder_ht
ht_joint = torch.cat((ct, decoder_output), dim=2)
fc_joint = torch.tanh(self.att_joint(ht_joint))
fc_out = self.fc(fc_joint)
return fc_out, decoder_h_n, decoder_c_n
三、具体代码
import torch
import torch.nn as nn
import torch.optim as optim
import jieba
import os
seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]
# 所有词
example_cut = []
answer_cut = []
word_all = []
# 分词
for i in seq_example:
example_cut.append(list(jieba.cut(i)))
for i in seq_answer:
answer_cut.append(list(jieba.cut(i)))
# 所有词
for i in example_cut + answer_cut:
for word in i:
if word not in word_all:
word_all.append(word)
# 词语索引表
word2index = {w: i+3 for i, w in enumerate(word_all)}
# 补全
word2index['PAD'] = 0
# 句子开始
word2index['SOS'] = 1
# 句子结束
word2index['EOS'] = 2
index2word = {value: key for key, value in word2index.items()}
# 一些参数
vocab_size = len(word2index)
seq_length = max([len(i) for i in example_cut + answer_cut]) + 1
embedding_size = 5
num_classes = vocab_size
n_hidden = 10
# 将句子用索引表示
def make_data(seq_list):
result = []
for word in seq_list:
seq_index = [word2index[i] for i in word]
if len(seq_index) < seq_length:
seq_index += [0] * (seq_length - len(seq_index))
result.append(seq_index)
return result
encoder_input = make_data(example_cut)
decoder_input = make_data([['SOS'] + i for i in answer_cut])
decoder_target = make_data([i + ['EOS'] for i in answer_cut])
# 训练数据
encoder_input, decoder_input, decoder_target = torch.LongTensor(encoder_input), torch.LongTensor(decoder_input), torch.LongTensor(decoder_target)
# 建立encoder模型
class lstm_encoder(nn.Module):
def __init__(self):
super(lstm_encoder, self).__init__()
# 双向LSTM
self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)
def forward(self, embedding_input):
encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
# 拼接前向和后向最后一个隐层
encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)
# 建立attention_decoder模型
class lstm_decoder(nn.Module):
def __init__(self):
super(lstm_decoder, self).__init__()
# 单向LSTM
self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
# attention参数
self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
# attention_joint参数
self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
# 定义全连接层
self.fc = nn.Linear(n_hidden * 2, num_classes)
def forward(self, input_x, encoder_output, hn, cn):
decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
decoder_output = decoder_output.permute(1, 0, 2)
encoder_output = encoder_output.permute(1, 0, 2)
decoder_output_att = self.att_weight(encoder_output)
decoder_output_att = decoder_output_att.permute(0, 2, 1)
# 计算分数score
decoder_output_score = decoder_output.bmm(decoder_output_att)
# 计算权重at
at = nn.functional.softmax(decoder_output_score, dim=2)
# 计算新的context向量ct
ct = at.bmm(encoder_output)
# 拼接ct和decoder_ht
ht_joint = torch.cat((ct, decoder_output), dim=2)
fc_joint = torch.tanh(self.att_joint(ht_joint))
fc_out = self.fc(fc_joint)
return fc_out, decoder_h_n, decoder_c_n
class seq2seq(nn.Module):
def __init__(self):
super(seq2seq, self).__init__()
self.word_vec = nn.Embedding(vocab_size, embedding_size)
# encoder
self.seq2seq_encoder = lstm_encoder()
# decoder
self.seq2seq_decoder = lstm_decoder()
def forward(self, encoder_input, decoder_input, inference_threshold=0):
embedding_encoder_input = self.word_vec(encoder_input)
embedding_decoder_input = self.word_vec(decoder_input)
# 调换第一维和第二维度
embedding_encoder_input = embedding_encoder_input.permute(1, 0, 2)
embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
# 编码器
encoder_output, h_n, c_n = self.seq2seq_encoder(embedding_encoder_input)
# 判断为训练还是预测
if inference_threshold:
# 解码器
decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
return decoder_output
else:
# 创建outputs张量存储Decoder的输出
outputs = []
for i in range(seq_length):
decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
decoder_x = torch.max(decoder_output.reshape(-1, 25), dim=1)[1].item()
if decoder_x in [0, 2]:
return outputs
outputs.append(decoder_x)
embedding_decoder_input = self.word_vec(torch.LongTensor([[decoder_x]]))
embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
return outputs
model = seq2seq()
print(model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.05)
# 判断是否有模型文件
if os.path.exists("./seq2seqModel.pkl"):
model.load_state_dict(torch.load('./seq2seqModel.pkl'))
else:
# 训练
model.train()
for epoch in range(10000):
pred = model(encoder_input, decoder_input, 1)
loss = criterion(pred.reshape(-1, 25), decoder_target.view(-1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 1000 == 0:
print("Epoch: %d, loss: %.5f " % (epoch + 1, loss))
# 保存模型
torch.save(model.state_dict(), './seq2seqModel.pkl')
# 测试
model.eval()
question_text = '你住在哪里'
question_cut = list(jieba.cut(question_text))
encoder_x = make_data([question_cut])
decoder_x = [[word2index['SOS']]]
encoder_x, decoder_x = torch.LongTensor(encoder_x), torch.LongTensor(decoder_x)
out = model(encoder_x, decoder_x)
answer = ''
for i in out:
answer += index2word[i]
print('问题:', question_text)
print('回答:', answer)