Pytorch简单实现seq2seq+Attention机器人问答

一、准备数据

1.seq_example代表问题,seq_answer代表答案,数据内容如下所示:

seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]

2.将数据进行jieba分词并加入索引index,其中SOS代表单词开头,EOS代表单词结尾,PAD补全,数据如下:

{'你': 3, '认识': 4, '我': 5, '吗': 6, '住': 7, '在': 8, '哪里': 9, '知道': 10, '的': 11, '名字': 12, '是': 13, '谁': 14, '会': 15, '唱歌': 16, '有': 17, '父母': 18, '当然': 19, '成都': 20, '不': 21, '机器人': 22, '不会': 23, '没有': 24, 'PAD': 0, 'SOS': 1, 'EOS': 2}

3. 最后将seq_example与seq_answer分词后使用索引表示

二、模型构建

1.encoder

采用双向LSTM处理输入向量,代码如下:

class lstm_encoder(nn.Module):
    def __init__(self):
        super(lstm_encoder, self).__init__()
#         双向LSTM
        self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)

    def forward(self, embedding_input):
        encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
        # 拼接前向和后向最后一个隐层
        encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
        encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
        return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)

2.decoder + Attention

decoder采用单向LSTM并加入Attention机制,即将decoder输出与encoder输出通过Atention拼接后进入全连接层做预测,Attention机制采用的General方式,具体过程如下所示:

 

Pytorch简单实现seq2seq+Attention机器人问答

 代码如下:

class lstm_decoder(nn.Module):
    def __init__(self):
        super(lstm_decoder, self).__init__()
        # 单向LSTM
        self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
        # attention参数
        self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
        # attention_joint参数
        self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
        # 定义全连接层
        self.fc = nn.Linear(n_hidden * 2, num_classes)

    def forward(self, input_x, encoder_output, hn, cn):
        decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
        decoder_output = decoder_output.permute(1, 0, 2)
        encoder_output = encoder_output.permute(1, 0, 2)
        decoder_output_att = self.att_weight(encoder_output)
        decoder_output_att = decoder_output_att.permute(0, 2, 1)
        # 计算分数score
        decoder_output_score = decoder_output.bmm(decoder_output_att)
        # 计算权重at
        at = nn.functional.softmax(decoder_output_score, dim=2)
        # 计算新的context向量ct
        ct = at.bmm(encoder_output)
        # 拼接ct和decoder_ht
        ht_joint = torch.cat((ct, decoder_output), dim=2)
        fc_joint = torch.tanh(self.att_joint(ht_joint))
        fc_out = self.fc(fc_joint)
        return fc_out, decoder_h_n, decoder_c_n

三、具体代码

import torch
import torch.nn as nn
import torch.optim as optim
import jieba
import os


seq_example = ["你认识我吗", "你住在哪里", "你知道我的名字吗", "你是谁", "你会唱歌吗", "你有父母吗"]
seq_answer = ["当然认识", "我住在成都", "我不知道", "我是机器人", "我不会", "我没有父母"]
# 所有词
example_cut = []
answer_cut = []
word_all = []
# 分词
for i in seq_example:
    example_cut.append(list(jieba.cut(i)))
for i in seq_answer:
    answer_cut.append(list(jieba.cut(i)))
#   所有词
for i in example_cut + answer_cut:
    for word in i:
        if word not in word_all:
            word_all.append(word)
# 词语索引表
word2index = {w: i+3 for i, w in enumerate(word_all)}
# 补全
word2index['PAD'] = 0
# 句子开始
word2index['SOS'] = 1
# 句子结束
word2index['EOS'] = 2
index2word = {value: key for key, value in word2index.items()}
# 一些参数
vocab_size = len(word2index)
seq_length = max([len(i) for i in example_cut + answer_cut]) + 1
embedding_size = 5
num_classes = vocab_size
n_hidden = 10

# 将句子用索引表示
def make_data(seq_list):
    result = []
    for word in seq_list:
        seq_index = [word2index[i] for i in word]
        if len(seq_index) < seq_length:
            seq_index += [0] * (seq_length - len(seq_index))
        result.append(seq_index)
    return result
encoder_input = make_data(example_cut)
decoder_input = make_data([['SOS'] + i for i in answer_cut])
decoder_target = make_data([i + ['EOS'] for i in answer_cut])
# 训练数据
encoder_input, decoder_input, decoder_target = torch.LongTensor(encoder_input), torch.LongTensor(decoder_input), torch.LongTensor(decoder_target)


# 建立encoder模型
class lstm_encoder(nn.Module):
    def __init__(self):
        super(lstm_encoder, self).__init__()
#         双向LSTM
        self.encoder = nn.LSTM(embedding_size, n_hidden, 1, bidirectional=True)

    def forward(self, embedding_input):
        encoder_output, (encoder_h_n, encoder_c_n) = self.encoder(embedding_input)
        # 拼接前向和后向最后一个隐层
        encoder_h_n = torch.cat([encoder_h_n[0], encoder_h_n[1]], dim=1)
        encoder_c_n = torch.cat([encoder_c_n[0], encoder_c_n[1]], dim=1)
        return encoder_output, encoder_h_n.unsqueeze(0), encoder_c_n.unsqueeze(0)


# 建立attention_decoder模型
class lstm_decoder(nn.Module):
    def __init__(self):
        super(lstm_decoder, self).__init__()
        # 单向LSTM
        self.decoder = nn.LSTM(embedding_size, n_hidden * 2, 1)
        # attention参数
        self.att_weight = nn.Linear(n_hidden * 2, n_hidden * 2)
        # attention_joint参数
        self.att_joint = nn.Linear(n_hidden * 4, n_hidden * 2)
        # 定义全连接层
        self.fc = nn.Linear(n_hidden * 2, num_classes)

    def forward(self, input_x, encoder_output, hn, cn):
        decoder_output, (decoder_h_n, decoder_c_n) = self.decoder(input_x, (hn, cn))
        decoder_output = decoder_output.permute(1, 0, 2)
        encoder_output = encoder_output.permute(1, 0, 2)
        decoder_output_att = self.att_weight(encoder_output)
        decoder_output_att = decoder_output_att.permute(0, 2, 1)
        # 计算分数score
        decoder_output_score = decoder_output.bmm(decoder_output_att)
        # 计算权重at
        at = nn.functional.softmax(decoder_output_score, dim=2)
        # 计算新的context向量ct
        ct = at.bmm(encoder_output)
        # 拼接ct和decoder_ht
        ht_joint = torch.cat((ct, decoder_output), dim=2)
        fc_joint = torch.tanh(self.att_joint(ht_joint))
        fc_out = self.fc(fc_joint)
        return fc_out, decoder_h_n, decoder_c_n


class seq2seq(nn.Module):
    def __init__(self):
        super(seq2seq, self).__init__()
        self.word_vec = nn.Embedding(vocab_size, embedding_size)
    #     encoder
        self.seq2seq_encoder = lstm_encoder()
    #     decoder
        self.seq2seq_decoder = lstm_decoder()

    def forward(self, encoder_input, decoder_input, inference_threshold=0):
        embedding_encoder_input = self.word_vec(encoder_input)
        embedding_decoder_input = self.word_vec(decoder_input)
        # 调换第一维和第二维度
        embedding_encoder_input = embedding_encoder_input.permute(1, 0, 2)
        embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
        # 编码器
        encoder_output, h_n, c_n = self.seq2seq_encoder(embedding_encoder_input)
        # 判断为训练还是预测
        if inference_threshold:
            # 解码器
            decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
            return decoder_output
        else:
            # 创建outputs张量存储Decoder的输出
            outputs = []
            for i in range(seq_length):
                decoder_output, h_n, c_n = self.seq2seq_decoder(embedding_decoder_input, encoder_output, h_n, c_n)
                decoder_x = torch.max(decoder_output.reshape(-1, 25), dim=1)[1].item()
                if decoder_x in [0, 2]:
                    return outputs
                outputs.append(decoder_x)
                embedding_decoder_input = self.word_vec(torch.LongTensor([[decoder_x]]))
                embedding_decoder_input = embedding_decoder_input.permute(1, 0, 2)
            return outputs


model = seq2seq()
print(model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.05)

# 判断是否有模型文件
if os.path.exists("./seq2seqModel.pkl"):
    model.load_state_dict(torch.load('./seq2seqModel.pkl'))
else:
    # 训练
    model.train()
    for epoch in range(10000):
        pred = model(encoder_input, decoder_input, 1)
        loss = criterion(pred.reshape(-1, 25), decoder_target.view(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if (epoch + 1) % 1000 == 0:
            print("Epoch: %d,  loss: %.5f " % (epoch + 1, loss))
    # 保存模型
    torch.save(model.state_dict(), './seq2seqModel.pkl')
# 测试
model.eval()
question_text = '你住在哪里'
question_cut = list(jieba.cut(question_text))
encoder_x = make_data([question_cut])
decoder_x = [[word2index['SOS']]]
encoder_x,  decoder_x = torch.LongTensor(encoder_x), torch.LongTensor(decoder_x)
out = model(encoder_x, decoder_x)
answer = ''
for i in out:
    answer += index2word[i]
print('问题:', question_text)
print('回答:', answer)

上一篇:线性查找法


下一篇:机器学习之深度学习学习笔记(五)