目录
一、任务描述
使用Pytorch相关API,设计两种网络结构,一种网络结构中只有全连接层,一种使用文本处理中最为常用的LSTM,将数据集进行10分类,观察并对比两者的分类效果。
模型情感分类的数据集是经典的IMDB数据集,数据集下载地址:http://ai.stanford.edu/~amaas/data/sentiment/。这是一份包含了5万条流行电影的评论数据,其中训练集25000条,测试集25000条。数据格式如下:
数据的标签以文件名的方式呈现,图中左边为名称,其中名称包含两部分,分别是序号和情感评分,即序号_情感评分。情感评分中1-4为neg,5-10为pos,共有10个分类。右边为文件中的评论内容。每个文件中文本长度不一定相等。
数据集的组织形式如下:
下载完数据集,在aclImdb文件夹中,有如下文件:
train和test分别表示训练数据和测试数据所在的文件夹,其中文件夹中的内容如下:
随意点开一个neg/pos,文件夹中都是txt文件,每个文件代表一条样本数据:
这些路径在后续写代码的过程中都会用得到,因为需要根据路径来读取每个txt文件。
二、思路分析
具体可以细分为如下几步:
-
准备数据集,实例化dataset,准备dataloader,即设计一个类来获取样本
-
构建模型,定义模型多少层、形状变化、用什么激活函数等
-
模型训练,观察迭代过程中的损失
-
模型评估,观察分类的准确率
这里再着重考虑一下评论文本该怎么表示:首先评论的文本需要分词处理,处理成一个个单词,但是由于评论有长有短,所以这里需要统一长度,假设每条评论文本都有max_len个词,比50个词长的文本进行截取操作,比50个词短的文本则填充到50。接着是关于词的表示,我们用word embedding,即词向量的形式表示一个词,词向量的维度是embedding dim。这里词向量是调用pytorch中nn.Embedding方法实现的,按照给定的词向量的维度,该方法会给每个词一个随机的词向量,当然这种方法的词向量肯定不太好,nn.Embedding方法还可以加载预训练好的词向量,比如glove,word2vec等,感兴趣的可以尝试一下。
nn.Embedding方法除了需要指定embedding dim这个参数以外,它是基于一个已有的词典来随机分配词向量的,所以我们还需要从训练数据中构建一个词典,词典的大小是训练样本中的所有词,构建过程下文中会讲到,构建了这个词典后,每个词在词典中都会映射到一个特有的,用数字表示的ID上,比如hello这个词在词典中对应的是367,world对应897。nn.Embedding方法就是按照这个来分配每个词的随机向量表示的。构建完成后,比如在测试阶段,我们也需要对测试样本进行处理,并得到向量表示,这样才能喂给神经网络得到预测结果,此时要是一个词在训练样本从没有出现过,那么这个词在词典中就找不到对应的数字表示了,所以构建词典的时候我们指定一个特殊的值"UNK",其值是0,也就是说,没出现过的词,都映射为0。前面我们还说过评论需要统一长度,填充词也被预先定义成“PAD”,其值是1。
比如对于一个测试样本,分词后得到:["ni", "hao", "shi", "jie"],其中ni和jie在训练样本中出现过,它们的ID分别为34和90,hao和shi没有出现,假设max_len为5,那么["ni", "hao", "shi", "jie"]可以表示为:[34, 0, 0, 90, 1],然后每个词都转换为词向量,喂给神经网络,得到输出与实际结果进行比较,观察是否正确分类。
上面叙述了一个大概的流程,具体的过程在下面会详细提到。
三、准备数据集
准备数据集和之前的方法一样,实例化dataset,准备dataloader,最终我们的数据可以处理成如下格式:
图中示意的是batch_size等于2的情况,也就是说dataloader一次只加载两个样本。其中[4,6]是两个样本的情感标签,后面的text是两个样本中分词得到的内容,形式为(['token1', 'token2'...],['token1', 'token2'...]),元组中每个列表对应一个样本,所以每个[]*有max_len个token。
其中关键点在于:
-
如何完成基础Dataset的构建和Dataloader的准备
-
每个batch中文本的长度不一致的问题如何解决
-
每个batch中的文本如何转化为数字序列
3.1 基础dataset的准备
import torch
from torch.utils.data import DataLoader,Dataset
import os
import re
# 路径需要根据情况修改,文件太大的时候可以引用绝对路径
data_base_path = r"data\aclImdb"
#1. 定义tokenize的方法,对评论文本分词
def tokenize(text):
# fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
# sub方法是替换
text = re.sub("<.*?>"," ",text,flags=re.S) # 去掉<...>中间的内容,主要是文本内容中存在<br/>等内容
text = re.sub("|".join(fileters)," ",text,flags=re.S) # 替换掉特殊字符,'|'是把所有要匹配的特殊字符连在一起
return [i.strip() for i in text.split()] # 去掉前后多余的空格
#2. 准备dataset
class ImdbDataset(Dataset):
def __init__(self,mode):
super(ImdbDataset,self).__init__()
# 读取所有的训练文件夹名称
if mode=="train":
text_path = [os.path.join(data_base_path,i) for i in ["train/neg","train/pos"]]
else:
text_path = [os.path.join(data_base_path,i) for i in ["test/neg","test/pos"]]
self.total_file_path_list = []
# 进一步获取所有文件的名称
for i in text_path:
self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])
def __getitem__(self, idx):
cur_path = self.total_file_path_list[idx]
# 返回path最后的文件名。如果path以/或\结尾,那么就会返回空值。即os.path.split(path)的第二个元素。
# cur_filename返回的是如:“0_3.txt”的文件名
cur_filename = os.path.basename(cur_path)
# 标题的形式是:3_4.txt 前面的3是索引,后面的4是分类
# 原本的分类是1-10,现在变为0-9
label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #处理标题,获取label,-1是因为要转化为[0-9]
text = tokenize(open(cur_path).read().strip()) #直接按照空格进行分词
return label,text
def __len__(self):
return len(self.total_file_path_list)
# 测试是否能成功获取数据
dataset = ImdbDataset(mode="train")
print(dataset[0])
# out:(2, ['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural', 'feelings'...])
# 2. 实例化,准备dataloader
dataset = ImdbDataset(mode="train")
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)
#3. 观察数据输出结果,在pytorch新版本(1.6)中,这里已经不能运行了,需要加上下面的`collate_fn`函数来运行,即使能够运行,结果也是不正确的
for idx,(label,text) in enumerate(dataloader):
print("idx:",idx)
print("lable:",label)
print("text:",text)
break
输出如下:
idx: 0
table: tensor([3, 1])
text: [('I', 'Want'), ('thought', 'a'), ('this', 'great'), ('was', 'recipe'), ('a', 'for'), ('great', 'failure'), ('idea', 'Take'), ('but', 'a'), ('boy', 's'), ('was', 'y'), ('it', 'plot'), ('poorly', 'add'), ('executed', 'in'), ('We', 'some'), ('do', 'weak'), ('get', 'completely'), ('a', 'undeveloped'), ('broad', 'characters'), ('sense', 'and'), ('of', 'than'), ('how', 'throw'), ('complex', 'in'), ('and', 'the'), ('challenging', 'worst'), ('the', 'special'), ('backstage', 'effects'), ('operations', 'a'), ('of', 'horror'), ('a', 'movie'), ('show', 'has'), ('are', 'known'), ('but', 'Let'), ('virtually', 'stew'), ('no', 'for'), ...('show', 'somehow'), ('rather', 'destroy'), ('than', 'every'), ('anything', 'copy'), ('worth', 'of'), ('watching', 'this'), ('for', 'film'), ('its', 'so'), ('own', 'it'), ('merit', 'will')]
很明显这里出现了问题,我们希望的是(['token1', 'token2'...],['token1', 'token2'...])的形式,但是结果中却把单词两两组合了。出现问题的原因在于Dataloader
中的参数collate_fn,
collate_fn
的默认值为torch自定义的default_collate
,collate_fn
的作用就是对每个batch进行处理,而默认的default_collate
处理出错。
在默认定义的collate_fn方法中,有一个参数batch,值为([tokens, label], [tokens, label])。也就是根据你的batch_size,决定元组中有多少个item,默认的collate_fn方法对batch做一个zip操作,把两个输入的item组合在一起,把两个目标值组合在一起,但是这里的输入是['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural']是这样的形式,会再进行一次zip操作,多进行了一次两两组合(!!!),但是这显然不是我们想要的。前面的手写数字识别的程序中,由于图片用像素表示,而且我们已经将图片转换为tensor了,所以不会出错。
那么怎么才能获取到正确结果呢?
方法1:考虑先把数据转化为数字序列,观察其结果是否符合要求,之前使用DataLoader并未出现类似错误
方法2:考虑自定义一个collate_fn
,观察结果
这里使用方式2,自定义一个collate_fn
,然后观察结果:
# 自定义的collate_fn方法
def collate_fn(batch):
# 手动zip操作,并转换为list,否则无法获取文本和标签了
batch = list(zip(*batch))
labels = torch.tensor(batch[0], dtype=torch.int32)
texts = batch[1]
texts = torch.tensor([ws.transform(i, max_len) for i in texts])
del batch
# 注意这里long()不可少,否则会报错
return labels.long(), texts.long()
#此时输出正常
for idx,(label,text) in enumerate(dataloader):
print("idx:",idx)
print("label:",label)
print("text:",text)
break
# table: tensor([2, 9], dtype=torch.int32) 2, 9是两个分类
# text:([], [])
最后我们可以准备一个get_dataloader方法,更方便的获取数据:
# 获取数据的方法
def get_dataloader(train=True):
if train:
mode = 'train'
else:
mode = "test"
dataset = ImdbDataset(mode)
batch_size = train_batch_size if train else test_batch_size
return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
3.2 文本序列化
我们这里使用的word embedding,不会直接把文本转化为向量,而是先转化为数字,再把数字转化为向量,那么这个过程该如何实现呢?
这里我们可以考虑把文本中的每个词语和其对应的数字,使用字典保存,同时实现方法把句子通过字典映射为包含数字的列表。
实现文本序列化之前,考虑以下几点:
-
如何使用字典把词语和数字进行对应
-
不同的词语出现的次数不尽相同,是否需要对高频或者低频词语进行过滤,以及总的词语数量是否需要进行限制
-
得到词典之后,如何把句子转化为数字序列
-
不同句子长度不相同,每个batch的句子如何构造成相同的长度(可以对短句子进行填充,填充特殊字符)
-
对于新出现的词语在词典中没有出现怎么办(可以使用特殊字符代替)
思路分析:
-
对所有句子进行分词
-
词语存入字典,根据次数对词语进行过滤,并统计次数
-
实现文本转数字序列的方法
-
实现数字序列转文本方法(其实该任务中用不到这个方法)
# Word2Sequence
class Word2Sequence:
# 未出现过的词
UNK_TAG = "UNK"
PAD_TAG = "PAD"
# 填充的词
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.count = {}
def to_index(self, word):
"""word -> index"""
return self.dict.get(word, self.UNK)
def to_word(self, index):
"""index -> word"""
if index in self.inversed_dict:
return self.inversed_dict[index]
return self.UNK_TAG
def __len__(self):
return len(self.dict)
def fit(self, sentence):
"""count字典中存储每个单词出现的次数"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
def build_vocab(self, min_count=None, max_count=None, max_feature=None):
"""
构建词典
只筛选出现次数在[min_count,max_count]之间的词
词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
"""
if min_count is not None:
self.count = {word: count for word, count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word: count for word, count in self.count.items() if count <= max_count}
if max_feature is not None:
self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
# 给词典中每个词分配一个数字ID
for word in self.count:
self.dict[word] = len(self.dict)
# 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
max_len:统一文本的单词个数
"""
if max_len is not None:
r = [self.PAD] * max_len
else:
r = [self.PAD] * len(sentence)
# 截断文本
if max_len is not None and len(sentence) > max_len:
sentence = sentence[:max_len]
for index, word in enumerate(sentence):
r[index] = self.to_index(word)
return np.array(r, dtype=np.int64)
def inverse_transform(self, indices):
"""数字序列-->单词序列"""
sentence = []
for i in indices:
word = self.to_word(i)
sentence.append(word)
return sentence
定义完这个类之后,可以简单测试一下效果:
# 测试Word2Sequence
w2s = Word2Sequence()
voc = [["你", "好", "么"],
["你", "好", "哦"]]
for i in voc:
w2s.fit(i)
w2s.build_vocab()
print(w2s.dict)
print(w2s.transform(["你", "好", "嘛"]))
结果如下:
{'UNK': 0, 'PAD': 1, '你': 2, '好': 3, '么': 4, '哦': 5}
[2 3 0]
功能经测试正确,那么我们就可以用训练文本构建词典了,注意不能读取测试样本。
# 建立词表
def fit_save_word_sequence():
word_to_sequence = Word2Sequence()
train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
# total_file_path_list存储总的需要读取的txt文件
total_file_path_list = []
for i in train_path:
total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
# tqdm是显示进度条的
for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
word_to_sequence.build_vocab()
# 对wordSequesnce进行保存
pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
执行该方法:
fit_save_word_sequence()
结果会生成如下文件,注意,model文件夹需要事先创建!
文件生成成功后,可以使用如下的代码加载,ws就是Word2Sequence类的一个实例,在后续工作中我们会用到。
ws = pickle.load(open("./model/ws.pkl", "rb"))
四、构建模型
4.1 仅有全连接层
class IMDBModel(nn.Module):
def __init__(self):
# 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
super(IMDBModel, self).__init__()
# nn.Embedding方法参数:
# len(ws):词典的总的词的数量。
# 300:词向量的维度,即embedding dim
# padding_idx,填充词
self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
self.fc1 = nn.Linear(max_len * 300, 128)
self.fc = nn.Linear(128, 10)
def forward(self, x):
embed = self.embedding(x)
embed = embed.view(x.size(0), -1)
out = self.fc1(embed)
out = F.relu(out)
out = self.fc(out)
return F.log_softmax(out, dim=-1)
4.2 LSTM
关于pytorch中LSTM api的使用,已经输入输出的理解,可以参考我之前写的博客:
万字长文:深入理解各类型神经网络(简单神经网络,CNN,LSTM)的输入和输出
class IMDBModel(nn.Module):
def __init__(self):
super(IMDBModel, self).__init__()
self.hidden_size = 64
self.embedding_dim = 200
self.num_layer = 2
self.bidirectional = True
self.bi_num = 2 if self.bidirectional else 1
self.dropout = 0.5
# 以上部分为超参数,可以自行修改
self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
self.num_layer, bidirectional=True, dropout=self.dropout)
self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
self.fc2 = nn.Linear(20, 10)
def forward(self, x):
x = self.embedding(x)
x = x.permute(1, 0, 2) # 进行轴交换
h_0, c_0 = self.init_hidden_state(x.size(1))
_, (h_n, c_n) = self.lstm(x, (h_0, c_0))
# 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
out = self.fc(out)
out = F.relu(out)
out = self.fc2(out)
return F.log_softmax(out, dim=-1)
def init_hidden_state(self, batch_size):
h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
return h_0, c_0
4.3 训练和测试
首先指定一些超参数,放在程序的最前面:
train_batch_size = 512
test_batch_size = 500
max_len = 50
训练和测试:
def train(epoch):
mode = True
train_dataloader = get_dataloader(mode)
for idx, (target, input) in enumerate(train_dataloader):
optimizer.zero_grad()
output = imdb_model(input)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if idx % 10 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, idx * len(input), len(train_dataloader.dataset),
100. * idx / len(train_dataloader), loss.item()))
torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
def test():
test_loss = 0
correct = 0
mode = False
imdb_model.eval()
test_dataloader = get_dataloader(mode)
with torch.no_grad():
for target, input in test_dataloader:
output = imdb_model(input)
test_loss += F.nll_loss(output, target, reduction="sum")
pred = torch.max(output, dim=-1, keepdim=False)[-1]
correct += pred.eq(target.data).sum()
test_loss = test_loss / len(test_dataloader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
test_loss, correct, len(test_dataloader.dataset),
100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
# # 测试数据集的功能
# dataset = ImdbDataset(mode="train")
# dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
# for idx, (label, text) in enumerate(dataloader):
# print("idx:", idx)
# print("lable:", label)
# print("text:", text)
# break
# 测试Word2Sequence
# w2s = Word2Sequence()
# voc = [["你", "好", "么"],
# ["你", "好", "哦"]]
# for i in voc:
# w2s.fit(i)
# w2s.build_vocab()
# print(w2s.dict)
# print(w2s.transform(["你", "好", "嘛"]))
fit_save_word_sequence()
# 训练和测试
test()
for i in range(3):
train(i)
print(
"训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
i + 1))
test()
其中两个不同网络结构训练和测试的代码是相同的,不过在LSTM网络中,有一处参数的变化:
train_batch_size = 64
五、完整代码
需要特别注意的是,中间有一行代码:
ws = pickle.load(open("./model/ws.pkl", "rb"))
这行代码得建立词库才能正常执行,也就是必须有这个文件才能不报错,所以在训练和测试之前,先执行:
fit_save_word_sequence()
5.1 全连接层实现分类完整代码
import torch
from torch.utils.data import DataLoader, Dataset
import os
import re
import numpy as np
import pickle
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'
train_batch_size = 512
test_batch_size = 500
max_len = 50
# 分词的API
def tokenize(text):
# fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
'\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
'“', ]
text = re.sub("<.*?>", " ", text, flags=re.S)
text = re.sub("|".join(fileters), " ", text, flags=re.S)
return [i.strip() for i in text.split()]
# 自定义的数据集
class ImdbDataset(Dataset):
def __init__(self, mode):
super(ImdbDataset, self).__init__()
if mode == "train":
text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
else:
text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]
self.total_file_path_list = []
for i in text_path:
self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
# print(self.total_file_path_list)
def __getitem__(self, idx):
cur_path = self.total_file_path_list[idx]
cur_filename = os.path.basename(cur_path)
label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
text = tokenize(open(cur_path, encoding="utf-8").read().strip())
return label, text
def __len__(self):
return len(self.total_file_path_list)
# 自定义的collate_fn方法
def collate_fn(batch):
# 手动zip操作,并转换为list,否则无法获取文本和标签了
batch = list(zip(*batch))
labels = torch.tensor(batch[0], dtype=torch.int32)
texts = batch[1]
texts = torch.tensor([ws.transform(i, max_len) for i in texts])
del batch
# 注意这里long()不可少,否则会报错
return labels.long(), texts.long()
# 获取数据的方法
def get_dataloader(train=True):
if train:
mode = 'train'
else:
mode = "test"
dataset = ImdbDataset(mode)
batch_size = train_batch_size if train else test_batch_size
return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
# Word2Sequence
class Word2Sequence:
# 未出现过的词
UNK_TAG = "UNK"
PAD_TAG = "PAD"
# 填充的词
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.count = {}
def to_index(self, word):
"""word -> index"""
return self.dict.get(word, self.UNK)
def to_word(self, index):
"""index -> word"""
if index in self.inversed_dict:
return self.inversed_dict[index]
return self.UNK_TAG
def __len__(self):
return len(self.dict)
def fit(self, sentence):
"""count字典中存储每个单词出现的次数"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
def build_vocab(self, min_count=None, max_count=None, max_feature=None):
"""
构建词典
只筛选出现次数在[min_count,max_count]之间的词
词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
"""
if min_count is not None:
self.count = {word: count for word, count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word: count for word, count in self.count.items() if count <= max_count}
if max_feature is not None:
self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
# 给词典中每个词分配一个数字ID
for word in self.count:
self.dict[word] = len(self.dict)
# 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
max_len:统一文本的单词个数
"""
if max_len is not None:
r = [self.PAD] * max_len
else:
r = [self.PAD] * len(sentence)
# 截断文本
if max_len is not None and len(sentence) > max_len:
sentence = sentence[:max_len]
for index, word in enumerate(sentence):
r[index] = self.to_index(word)
return np.array(r, dtype=np.int64)
def inverse_transform(self, indices):
"""数字序列-->单词序列"""
sentence = []
for i in indices:
word = self.to_word(i)
sentence.append(word)
return sentence
# 建立词表
def fit_save_word_sequence():
word_to_sequence = Word2Sequence()
train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
# total_file_path_list存储总的需要读取的txt文件
total_file_path_list = []
for i in train_path:
total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
# tqdm是显示进度条的
for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
word_to_sequence.build_vocab()
# 对wordSequesnce进行保存
pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
ws = pickle.load(open("./model/ws.pkl", "rb"))
# print(len(ws))
# 模型
class IMDBModel(nn.Module):
def __init__(self):
# 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
super(IMDBModel, self).__init__()
# nn.Embedding方法参数:
# len(ws):词典的总的词的数量。
# 300:词向量的维度,即embedding dim
# padding_idx,填充词
self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
self.fc1 = nn.Linear(max_len * 300, 128)
self.fc = nn.Linear(128, 10)
def forward(self, x):
embed = self.embedding(x)
embed = embed.view(x.size(0), -1)
out = self.fc1(embed)
out = F.relu(out)
out = self.fc(out)
return F.log_softmax(out, dim=-1)
# 实例化
imdb_model = IMDBModel()
# 优化器
optimizer = optim.Adam(imdb_model.parameters())
# 交叉熵损失
criterion = nn.CrossEntropyLoss()
def train(epoch):
mode = True
train_dataloader = get_dataloader(mode)
for idx, (target, input) in enumerate(train_dataloader):
optimizer.zero_grad()
output = imdb_model(input)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if idx % 10 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, idx * len(input), len(train_dataloader.dataset),
100. * idx / len(train_dataloader), loss.item()))
torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')
def test():
test_loss = 0
correct = 0
mode = False
imdb_model.eval()
test_dataloader = get_dataloader(mode)
with torch.no_grad():
for target, input in test_dataloader:
output = imdb_model(input)
test_loss += F.nll_loss(output, target, reduction="sum")
pred = torch.max(output, dim=-1, keepdim=False)[-1]
correct += pred.eq(target.data).sum()
test_loss = test_loss / len(test_dataloader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
test_loss, correct, len(test_dataloader.dataset),
100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
# # 测试数据集的功能
# dataset = ImdbDataset(mode="train")
# dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
# for idx, (label, text) in enumerate(dataloader):
# print("idx:", idx)
# print("lable:", label)
# print("text:", text)
# break
# 测试Word2Sequence
# w2s = Word2Sequence()
# voc = [["你", "好", "么"],
# ["你", "好", "哦"]]
# for i in voc:
# w2s.fit(i)
# w2s.build_vocab()
# print(w2s.dict)
# print(w2s.transform(["你", "好", "嘛"]))
# fit_save_word_sequence()
# 训练和测试
test()
for i in range(3):
train(i)
print(
"训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
i + 1))
test()
5.2 LSTM分类完整代码
其中有些注释参照上面的程序,两段代码只有模型定义和一个train_batch_size不相同。
import torch
from torch.utils.data import DataLoader, Dataset
import os
import re
import numpy as np
import pickle
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'
train_batch_size = 64
test_batch_size = 500
max_len = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 分词的API
def tokenize(text):
# fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
'\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
'“', ]
text = re.sub("<.*?>", " ", text, flags=re.S)
text = re.sub("|".join(fileters), " ", text, flags=re.S)
return [i.strip() for i in text.split()]
# 自定义的数据集
class ImdbDataset(Dataset):
def __init__(self, mode):
super(ImdbDataset, self).__init__()
if mode == "train":
text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
else:
text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]
self.total_file_path_list = []
for i in text_path:
self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
# print(self.total_file_path_list)
def __getitem__(self, idx):
cur_path = self.total_file_path_list[idx]
cur_filename = os.path.basename(cur_path)
label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
text = tokenize(open(cur_path, encoding="utf-8").read().strip())
return label, text
def __len__(self):
return len(self.total_file_path_list)
# 自定义的collate_fn方法
def collate_fn(batch):
batch = list(zip(*batch))
labels = torch.tensor(batch[0], dtype=torch.int32)
texts = batch[1]
texts = torch.tensor([ws.transform(i, max_len) for i in texts])
del batch
return labels.long(), texts.long()
# 获取数据的方法
def get_dataloader(train=True):
if train:
mode = 'train'
else:
mode = "test"
dataset = ImdbDataset(mode)
batch_size = train_batch_size if train else test_batch_size
return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
# Word2Sequence
class Word2Sequence:
UNK_TAG = "UNK"
PAD_TAG = "PAD"
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.fited = False
self.count = {}
def to_index(self, word):
return self.dict.get(word, self.UNK)
def to_word(self, index):
if index in self.inversed_dict:
return self.inversed_dict[index]
return self.UNK_TAG
def __len__(self):
return len(self.dict)
def fit(self, sentence):
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
def build_vocab(self, min_count=None, max_count=None, max_feature=None):
if min_count is not None:
self.count = {word: count for word, count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word: count for word, count in self.count.items() if count <= max_count}
if max_feature is not None:
self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
for word in self.count:
self.dict[word] = len(self.dict)
self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
if max_len is not None:
r = [self.PAD] * max_len
else:
r = [self.PAD] * len(sentence)
if max_len is not None and len(sentence) > max_len:
sentence = sentence[:max_len]
for index, word in enumerate(sentence):
r[index] = self.to_index(word)
return np.array(r, dtype=np.int64)
def inverse_transform(self, indices):
sentence = []
for i in indices:
word = self.to_word(i)
sentence.append(word)
return sentence
# 建立词表
def fit_save_word_sequence():
word_to_sequence = Word2Sequence()
train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
total_file_path_list = []
for i in train_path:
total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
word_to_sequence.build_vocab()
pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))
ws = pickle.load(open("./model/ws.pkl", "rb"))
# print(len(ws))
# 模型
class IMDBModel(nn.Module):
def __init__(self):
super(IMDBModel, self).__init__()
self.hidden_size = 64
self.embedding_dim = 200
self.num_layer = 2
self.bidirectional = True
self.bi_num = 2 if self.bidirectional else 1
self.dropout = 0.5
# 以上部分为超参数,可以自行修改
self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
self.num_layer, bidirectional=True, dropout=self.dropout)
self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
self.fc2 = nn.Linear(20, 10)
def forward(self, x):
x = self.embedding(x)
x = x.permute(1, 0, 2) # 进行轴交换
h_0, c_0 = self.init_hidden_state(x.size(1))
_, (h_n, c_n) = self.lstm(x, (h_0, c_0))
# 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
out = self.fc(out)
out = F.relu(out)
out = self.fc2(out)
return F.log_softmax(out, dim=-1)
def init_hidden_state(self, batch_size):
h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
return h_0, c_0
imdb_model = IMDBModel()
optimizer = optim.Adam(imdb_model.parameters())
criterion = nn.CrossEntropyLoss()
def train(epoch):
mode = True
train_dataloader = get_dataloader(mode)
for idx, (target, input) in enumerate(train_dataloader):
optimizer.zero_grad()
output = imdb_model(input)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if idx % 10 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, idx * len(input), len(train_dataloader.dataset),
100. * idx / len(train_dataloader), loss.item()))
torch.save(imdb_model.state_dict(), "model/mnist_net_lstm.pkl")
torch.save(optimizer.state_dict(), 'model/mnist_optimizer_lstm.pkl')
def test():
test_loss = 0
correct = 0
mode = False
imdb_model.eval()
test_dataloader = get_dataloader(mode)
with torch.no_grad():
for target, input in test_dataloader:
output = imdb_model(input)
test_loss += F.nll_loss(output, target, reduction="sum")
pred = torch.max(output, dim=-1, keepdim=False)[-1]
correct += pred.eq(target.data).sum()
test_loss = test_loss / len(test_dataloader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
test_loss, correct, len(test_dataloader.dataset),
100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
# # 测试数据集的功能
# dataset = ImdbDataset(mode="train")
# dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
# for idx, (label, text) in enumerate(dataloader):
# print("idx:", idx)
# print("lable:", label)
# print("text:", text)
# break
# 测试Word2Sequence
# fit_save_word_sequence()
# w2s = Word2Sequence()
# voc = [["你", "好", "么"],
# ["你", "好", "哦"]]
# for i in voc:
# w2s.fit(i)
# w2s.build_vocab()
# print(w2s.dict)
# print(w2s.transform(["你", "好", "嘛"]))
# 训练和测试
test()
for i in range(3):
train(i)
print(
"训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
i + 1))
test()
5.3 测试结果
以下是在我的电脑中,仅仅只使用CPU的情况下跑得的结果,训练和测试都比较慢,其中LSTM的效果不理想,可能是数据集不足以支撑起10分类。
1.没有训练,直接测试:精确率大约在15%左右
2.全连接神经网络:精确率大约在25%左右
3.LSTM:精确率大约在35%左右
结果仅供参考