import torch
import torch as t
import numpy as np
import random
from torch.utils.data import DataLoader
from torch import optim
from torch import nn
from torchnet import meter
import tqdm
class Config(object):
num_layers = 3 # LSTM层数
data_path = 'data/' # 诗歌的文本文件存放路径
pickle_path = 'tang.npz' # 预处理好的二进制文件
author = None # 只学习某位作者的诗歌
constrain = None # 长度限制
category = 'poet.tang' # 类别,唐诗还是宋诗歌(poet.song)
lr = 1e-3
weight_decay = 1e-4
epoch = 5000
batch_size = 16
maxlen = 125 # 超过这个长度的之后字被丢弃,小于这个长度的在前面补空格
plot_every = 1000 # 每20个batch 可视化一次
use_gpu = True
# use_env = True # 是否使用visodm
env = 'poetry' # visdom env
max_gen_len = 200 # 生成诗歌最长长度
debug_file = '/tmp/debugp'
model_path = "./cps_new/tang_0.pth" # 预训练模型路径
prefix_words = '床前明月光,疑是地上霜。' # 不是诗歌的组成部分,用来控制生成诗歌的意境
start_words = '窗前明月光' # 诗歌开始
acrostic = False # 是否是藏头诗
model_prefix = 'cps_new/tang' # 模型保存路径
embedding_dim = 256
hidden_dim = 512
class PoetryModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(PoetryModel, self).__init__()
self.hidden_dim = hidden_dim
# 词向量层,词表大小 * 向量维度
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
# 网络主要结构
self.lstm = nn.LSTM(embedding_dim, self.hidden_dim, num_layers=Config.num_layers, dropout = 0.1)
# 进行分类
self.linear = nn.Linear(self.hidden_dim, vocab_size)
def forward(self, input, hidden=None):
seq_len, batch_size = input.size()
if hidden is None:
h_0 = input.data.new(Config.num_layers, batch_size, self.hidden_dim).fill_(0).float()
c_0 = input.data.new(Config.num_layers, batch_size, self.hidden_dim).fill_(0).float()
else:
h_0, c_0 = hidden
# 输入 序列长度 * batch(每个汉字是一个数字下标),
# 输出 序列长度 * batch * 向量维度
embeds = self.embeddings(input)
# 输出hidden的大小: 序列长度 * batch * hidden_dim
output, hidden = self.lstm(embeds, (h_0, c_0))
output = self.linear(output.view(seq_len * batch_size, -1))
return output, hiddendef isChinese(word):
for ch in word:
if '\u4e00' <= ch <= '\u9fff':
return True
return False
def Load():
print("Begin Loading...")
datas = np.loadtxt("./data_new.txt",
dtype = str, delimiter = '\n', encoding = "utf-8")
len_datas = len(datas)
_char = []
for i in range(len_datas):
# if len(datas[i]) == 24:
for j in range(len(datas[i])):
_char.append(datas[i][j])
print("Done")
_char = list(set(_char))
return datas, _char
if True:
print("正在初始化......")
datas, char = Load()
word2ix = {}
ix2word = {}
char.append('</s>')
char.append('<START>')
char.append('<EOP>')
index = 0
for item in char:
ix2word[index] = item
index += 1
for key in ix2word:
word2ix[ix2word[key]]=key
len_datas = len(datas)
cnt = 0;
for i in range(len_datas):
if len(datas[i]) == 24:
cnt += 1
data = np.empty([cnt, 26], dtype = int)
index = 0
for i in range(len_datas):
if len(datas[i]) != 24:
continue
tmp = []
tmp.append(word2ix['<START>'])
for j in range(len(datas[i])):
tmp.append(word2ix[datas[i][j]])
tmp.append(word2ix['<EOP>'])
data[index] = tmp
index += 1
data = t.from_numpy(data)
dataloader = DataLoader(data,
batch_size=Config.batch_size,
shuffle=True,
num_workers=0)
def generate(model, start_words, ix2word, word2ix, prefix_words=None):
results = list(start_words)
start_words_len = len(start_words)
# 第一个词语是<START>
input = t.Tensor([word2ix['<START>']]).view(1, 1).long()
if Config.use_gpu:
input = input.cuda()
hidden = None
# 若有风格前缀,则先用风格前缀生成hidden
if prefix_words:
# 第一个input是<START>,后面就是prefix中的汉字
# 第一个hidden是None,后面就是前面生成的hidden
for word in prefix_words:
output, hidden = model(input, hidden)
input = input.data.new([word2ix[word]]).view(1, 1)
# 开始真正生成诗句,如果没有使用风格前缀,则hidden = None,input = <START>
# 否则,input就是风格前缀的最后一个词语,hidden也是生成出来的
for i in range(24):
output, hidden = model(input, hidden)
# 如果还在诗句内部,输入就是诗句的字,不取出结果,只为了得到
# 最后的hidden
if i < start_words_len:
w = results[i]
input = input.data.new([word2ix[w]]).view(1, 1)
# 否则将output作为下一个input进行
else:
top_index = output.data[0].topk(1)[1][0].item()
w = ix2word[top_index]
results.append(w)
input = input.data.new([top_index]).view(1, 1)
# if w == '<EOP>':
# del results[-1]
# break
return results
# 生成藏头诗
def gen_acrostic(model, start_words, ix2word, word2ix, prefix_words=None):
result = []
start_words_len = len(start_words)
input = (t.Tensor([word2ix['<START>']]).view(1, 1).long())
if Config.use_gpu:
input = input.cuda()
# 指示已经生成了几句藏头诗
index = 0
pre_word = '<START>'
hidden = None
# 存在风格前缀,则生成hidden
if prefix_words:
for word in prefix_words:
output, hidden = model(input, hidden)
input = (input.data.new([word2ix[word]])).view(1, 1)
# 开始生成诗句
for i in range(Config.max_gen_len):
output, hidden = model(input, hidden)
top_index = output.data[0].topk(1)[1][0].item()
w = ix2word[top_index]
# 说明上个字是句末
if pre_word in {'。', ',', '?', '!', '<START>'}:
if index == start_words_len:
break
else:
w = start_words[index]
index += 1
input = (input.data.new([word2ix[w]])).view(1, 1)
else:
input = (input.data.new([top_index])).view(1, 1)
result.append(w)
pre_word = w
return result
# 根据 topic 生成
def gen_topic(model, start_words, ix2word, word2ix, prefix_words=None):
results = list(start_words)
start_words_len = len(start_words)
# 第一个词语是<START>
input = t.Tensor([word2ix['<START>']]).view(1, 1).long()
if Config.use_gpu:
input = input.cuda()
hidden = None
# 若有风格前缀,则先用风格前缀生成hidden
if prefix_words:
# 第一个input是<START>,后面就是prefix中的汉字
# 第一个hidden是None,后面就是前面生成的hidden
prefix_words_len = len(prefix_words)
for word in prefix_words:
output, hidden = model(input, hidden)
input = input.data.new([word2ix[word]]).view(1, 1)
for i in range(24 - prefix_words_len):
output, hidden = model(input, hidden)
# 如果还在诗句内部,输入就是诗句的字,不取出结果,只为了得到
# 最后的hidden
if i < start_words_len:
w = results[i]
input = input.data.new([word2ix[w]]).view(1, 1)
# 否则将output作为下一个input进行
else:
top_index = output.data[0].topk(1)[1][0].item()
w = ix2word[top_index]
input = input.data.new([top_index]).view(1, 1)
if w == '<EOP>':
del results[-1]
break
# 开始真正生成诗句,如果没有使用风格前缀,则 hidden = None,input = <START>
# 否则,input就是风格前缀的最后一个词语,hidden也是生成出来的
output, hidden = model(input, hidden)
top_index = output.data[0].topk(1)[1][0].item()
w = ix2word[top_index]
input = input.data.new([word2ix[w]]).view(1, 1)
for i in range(24):
output, hidden = model(input, hidden)
# 如果还在诗句内部,输入就是诗句的字,不取出结果,只为了得到
# 最后的hidden
if i < start_words_len:
w = results[i]
input = input.data.new([word2ix[w]]).view(1, 1)
# 否则将output作为下一个input进行
else:
top_index = output.data[0].topk(1)[1][0].item()
w = ix2word[top_index]
results.append(w)
input = input.data.new([top_index]).view(1, 1)
if w == '<EOP>':
del results[-1]
break
return results
def train():
if Config.use_gpu:
Config.device = t.device("cuda")
else:
Config.device = t.device("cpu")
device = Config.device
# 定义模型
model = PoetryModel(len(word2ix),
embedding_dim=Config.embedding_dim,
hidden_dim = Config.hidden_dim)
Configimizer = optim.Adam(model.parameters(),lr=Config.lr)
criterion = nn.CrossEntropyLoss()
model.load_state_dict(t.load(Config.model_path))
# 转移到相应计算设备上
model.to(device)
loss_meter = meter.AverageValueMeter()
# 进行训练
f = open('result.txt','w')
for epoch in range(Config.epoch):
loss_meter.reset()
for li,data_ in tqdm.tqdm(enumerate(dataloader)):
data_ = data_.long().transpose(1,0).contiguous()
# 注意这里,也转移到了计算设备上
data_ = data_.to(device)
Configimizer.zero_grad()
# n个句子,前n-1句作为输入,后n-1句作为输出,二者一一对应
input_,target = data_[:-1,:],data_[1:,:]
output,_ = model(input_)
# 这里为什么view(-1)
# if (1+li)%2436==0:
# print(target.shape,target.view(-1).shape)
loss = criterion(output,target.view(-1))
loss.backward()
Configimizer.step()
loss_meter.add(loss.item())
# 进行可视化
if (1+li)%1000 == 0:
print("训练损失为%s"%(str(loss_meter.mean)))
word = "明月"
gen_poetry = ''.join(gen_topic(model,'',ix2word,word2ix,word))
print(gen_poetry)
t.save(model.state_dict(),'%s_%s.pth'%(Config.model_prefix,epoch % 2))
if __name__ == '__main__':
train()
def userTest():
while True:
print("欢迎使用唐诗生成器,\n"
"输入1 进入首句生成模式\n"
"输入2 进入藏头诗生成模式\n"
"输入3 进入主题诗生成模式\n")
mode = int(input())
model = PoetryModel(len(ix2word), Config.embedding_dim, Config.hidden_dim)
model.load_state_dict(t.load("./cps_new/tang_0.pth"))
if Config.use_gpu:
model.to(t.device('cuda'))
if mode == 1:
print("请输入您想要的诗歌首句,必须是五言")
# start_words = str(input())
start_words = ''
gen_poetry = ''.join(generate(model, start_words, ix2word, word2ix))
print("生成的诗句如下:%s\n" % (gen_poetry))
elif mode == 2:
print("请输入您想要的诗歌藏头部分,必须是 4 个字")
start_words = str(input())
gen_poetry = ''.join(gen_acrostic(model, start_words, ix2word, word2ix))
print("生成的诗句如下:%s\n" % gen_poetry)
elif mode == 3:
print("请输入您想要的诗歌topic,必须是 2 个字")
# prefix_words = str(input())
gen_poetry = ''.join(gen_topic(model, Config.start_words, ix2word, word2ix, Config.prefix_words))
print("生成的诗句如下:%s\n" % gen_poetry)
if __name__ == '__main__':
userTest()