pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

目录

 

一、任务描述

二、思路分析

三、准备数据集

3.1 基础dataset的准备

3.2 文本序列化

四、构建模型

4.1 仅有全连接层

4.2 LSTM

4.3 训练和测试

五、完整代码

5.1 全连接层实现分类完整代码

5.2 LSTM分类完整代码

5.3 测试结果


一、任务描述

使用Pytorch相关API,设计两种网络结构,一种网络结构中只有全连接层,一种使用文本处理中最为常用的LSTM,将数据集进行10分类,观察并对比两者的分类效果。

模型情感分类的数据集是经典的IMDB数据集,数据集下载地址:http://ai.stanford.edu/~amaas/data/sentiment/。这是一份包含了5万条流行电影的评论数据,其中训练集25000条,测试集25000条。数据格式如下:

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

数据的标签以文件名的方式呈现,图中左边为名称,其中名称包含两部分,分别是序号和情感评分,即序号_情感评分。情感评分中1-4为neg,5-10为pos,共有10个分类。右边为文件中的评论内容。每个文件中文本长度不一定相等。

数据集的组织形式如下:

下载完数据集,在aclImdb文件夹中,有如下文件:

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

traintest分别表示训练数据测试数据所在的文件夹,其中文件夹中的内容如下:

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

随意点开一个neg/pos,文件夹中都是txt文件,每个文件代表一条样本数据:

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

这些路径在后续写代码的过程中都会用得到,因为需要根据路径来读取每个txt文件。

 

二、思路分析

具体可以细分为如下几步:

  1. 准备数据集,实例化dataset,准备dataloader,即设计一个类来获取样本

  2. 构建模型,定义模型多少层、形状变化、用什么激活函数等

  3. 模型训练,观察迭代过程中的损失

  4. 模型评估,观察分类的准确率

这里再着重考虑一下评论文本该怎么表示:首先评论的文本需要分词处理,处理成一个个单词,但是由于评论有长有短,所以这里需要统一长度,假设每条评论文本都有max_len个词,比50个词长的文本进行截取操作,比50个词短的文本则填充到50。接着是关于词的表示,我们用word embedding,即词向量的形式表示一个词,词向量的维度是embedding dim。这里词向量是调用pytorch中nn.Embedding方法实现的,按照给定的词向量的维度,该方法会给每个词一个随机的词向量,当然这种方法的词向量肯定不太好,nn.Embedding方法还可以加载预训练好的词向量,比如glove,word2vec等,感兴趣的可以尝试一下。

nn.Embedding方法除了需要指定embedding dim这个参数以外,它是基于一个已有的词典来随机分配词向量的,所以我们还需要从训练数据中构建一个词典,词典的大小是训练样本中的所有词,构建过程下文中会讲到,构建了这个词典后,每个词在词典中都会映射到一个特有的,用数字表示的ID上,比如hello这个词在词典中对应的是367,world对应897。nn.Embedding方法就是按照这个来分配每个词的随机向量表示的。构建完成后,比如在测试阶段,我们也需要对测试样本进行处理,并得到向量表示,这样才能喂给神经网络得到预测结果,此时要是一个词在训练样本从没有出现过,那么这个词在词典中就找不到对应的数字表示了,所以构建词典的时候我们指定一个特殊的值"UNK",其值是0,也就是说,没出现过的词,都映射为0。前面我们还说过评论需要统一长度,填充词也被预先定义成“PAD”,其值是1。

比如对于一个测试样本,分词后得到:["ni", "hao", "shi", "jie"],其中ni和jie在训练样本中出现过,它们的ID分别为34和90,hao和shi没有出现,假设max_len为5那么["ni", "hao", "shi", "jie"]可以表示为:[34, 0, 0, 90, 1],然后每个词都转换为词向量,喂给神经网络,得到输出与实际结果进行比较,观察是否正确分类。

上面叙述了一个大概的流程,具体的过程在下面会详细提到。

 

三、准备数据集

准备数据集和之前的方法一样,实例化dataset,准备dataloader,最终我们的数据可以处理成如下格式:

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

图中示意的是batch_size等于2的情况,也就是说dataloader一次只加载两个样本。其中[4,6]是两个样本的情感标签,后面的text是两个样本中分词得到的内容,形式为(['token1', 'token2'...],['token1', 'token2'...]),元组中每个列表对应一个样本,所以每个[]*有max_len个token。

 

其中关键点在于:

  1. 如何完成基础Dataset的构建和Dataloader的准备

  2. 每个batch中文本的长度不一致的问题如何解决

  3. 每个batch中的文本如何转化为数字序列

3.1 基础dataset的准备

import torch
from torch.utils.data import DataLoader,Dataset
import os
import re

# 路径需要根据情况修改,文件太大的时候可以引用绝对路径
data_base_path = r"data\aclImdb"

#1. 定义tokenize的方法,对评论文本分词
def tokenize(text):
    # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
    fileters = ['!','"','#','$','%','&','\(','\)','\*','\+',',','-','\.','/',':',';','<','=','>','\?','@'
        ,'\[','\\','\]','^','_','`','\{','\|','\}','~','\t','\n','\x97','\x96','”','“',]
    # sub方法是替换
    text = re.sub("<.*?>"," ",text,flags=re.S)	# 去掉<...>中间的内容,主要是文本内容中存在<br/>等内容
    text = re.sub("|".join(fileters)," ",text,flags=re.S)	# 替换掉特殊字符,'|'是把所有要匹配的特殊字符连在一起
    return [i.strip() for i in text.split()]	# 去掉前后多余的空格

#2. 准备dataset
class ImdbDataset(Dataset):
    def __init__(self,mode):
        super(ImdbDataset,self).__init__()
        # 读取所有的训练文件夹名称
        if mode=="train":
            text_path = [os.path.join(data_base_path,i)  for i in ["train/neg","train/pos"]]
        else:
            text_path =  [os.path.join(data_base_path,i)  for i in ["test/neg","test/pos"]]

        self.total_file_path_list = []
        # 进一步获取所有文件的名称
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i,j) for j in os.listdir(i)])


    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
		# 返回path最后的文件名。如果path以/或\结尾,那么就会返回空值。即os.path.split(path)的第二个元素。
        # cur_filename返回的是如:“0_3.txt”的文件名
        cur_filename = os.path.basename(cur_path)
        # 标题的形式是:3_4.txt	前面的3是索引,后面的4是分类
        # 原本的分类是1-10,现在变为0-9
        label = int(cur_filename.split("_")[-1].split(".")[0]) -1 #处理标题,获取label,-1是因为要转化为[0-9]
        text = tokenize(open(cur_path).read().strip()) #直接按照空格进行分词
        return label,text

    def __len__(self):
        return len(self.total_file_path_list)
    
# 测试是否能成功获取数据
dataset = ImdbDataset(mode="train")
print(dataset[0])
# out:(2, ['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural', 'feelings'...])
    
# 2. 实例化,准备dataloader
dataset = ImdbDataset(mode="train")
dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)

#3. 观察数据输出结果,在pytorch新版本(1.6)中,这里已经不能运行了,需要加上下面的`collate_fn`函数来运行,即使能够运行,结果也是不正确的
for idx,(label,text) in enumerate(dataloader):
    print("idx:",idx)
    print("lable:",label)
    print("text:",text)
    break

输出如下:

idx: 0
table: tensor([3, 1])
text: [('I', 'Want'), ('thought', 'a'), ('this', 'great'), ('was', 'recipe'), ('a', 'for'), ('great', 'failure'), ('idea', 'Take'), ('but', 'a'), ('boy', 's'), ('was', 'y'), ('it', 'plot'), ('poorly', 'add'), ('executed', 'in'), ('We', 'some'), ('do', 'weak'), ('get', 'completely'), ('a', 'undeveloped'), ('broad', 'characters'), ('sense', 'and'), ('of', 'than'), ('how', 'throw'), ('complex', 'in'), ('and', 'the'), ('challenging', 'worst'), ('the', 'special'), ('backstage', 'effects'), ('operations', 'a'), ('of', 'horror'), ('a', 'movie'), ('show', 'has'), ('are', 'known'), ('but', 'Let'), ('virtually', 'stew'), ('no', 'for'), ...('show', 'somehow'), ('rather', 'destroy'), ('than', 'every'), ('anything', 'copy'), ('worth', 'of'), ('watching', 'this'), ('for', 'film'), ('its', 'so'), ('own', 'it'), ('merit', 'will')]

很明显这里出现了问题,我们希望的是(['token1', 'token2'...],['token1', 'token2'...])的形式,但是结果中却把单词两两组合了。出现问题的原因在于Dataloader中的参数collate_fn,collate_fn的默认值为torch自定义的default_collate,collate_fn的作用就是对每个batch进行处理,而默认的default_collate处理出错。

在默认定义的collate_fn方法中,有一个参数batch,值为([tokens, label], [tokens, label])。也就是根据你的batch_size,决定元组中有多少个item,默认的collate_fn方法对batch做一个zip操作,把两个输入的item组合在一起,把两个目标值组合在一起,但是这里的输入是['Story', 'of', 'a', 'man', 'who', 'has', 'unnatural']是这样的形式,会再进行一次zip操作,多进行了一次两两组合(!!!),但是这显然不是我们想要的。前面的手写数字识别的程序中,由于图片用像素表示,而且我们已经将图片转换为tensor了,所以不会出错。

那么怎么才能获取到正确结果呢?

方法1:考虑先把数据转化为数字序列,观察其结果是否符合要求,之前使用DataLoader并未出现类似错误

方法2:考虑自定义一个collate_fn,观察结果

这里使用方式2,自定义一个collate_fn,然后观察结果:

# 自定义的collate_fn方法
def collate_fn(batch):
    # 手动zip操作,并转换为list,否则无法获取文本和标签了
    batch = list(zip(*batch))
    labels = torch.tensor(batch[0], dtype=torch.int32)
    texts = batch[1]
    texts = torch.tensor([ws.transform(i, max_len) for i in texts])
    del batch
    # 注意这里long()不可少,否则会报错
    return labels.long(), texts.long()

#此时输出正常
for idx,(label,text) in enumerate(dataloader):
    print("idx:",idx)
    print("label:",label)
    print("text:",text)
    break
# table: tensor([2, 9], dtype=torch.int32)	2, 9是两个分类
# text:([], [])

最后我们可以准备一个get_dataloader方法,更方便的获取数据:

# 获取数据的方法
def get_dataloader(train=True):
    if train:
        mode = 'train'
    else:
        mode = "test"
    dataset = ImdbDataset(mode)
    batch_size = train_batch_size if train else test_batch_size
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

3.2 文本序列化

我们这里使用的word embedding,不会直接把文本转化为向量,而是先转化为数字,再把数字转化为向量,那么这个过程该如何实现呢?

这里我们可以考虑把文本中的每个词语和其对应的数字,使用字典保存,同时实现方法把句子通过字典映射为包含数字的列表

实现文本序列化之前,考虑以下几点:

  1. 如何使用字典把词语和数字进行对应

  2. 不同的词语出现的次数不尽相同,是否需要对高频或者低频词语进行过滤,以及总的词语数量是否需要进行限制

  3. 得到词典之后,如何把句子转化为数字序列

  4. 不同句子长度不相同,每个batch的句子如何构造成相同的长度(可以对短句子进行填充,填充特殊字符)

  5. 对于新出现的词语在词典中没有出现怎么办(可以使用特殊字符代替

思路分析:

  1. 对所有句子进行分词

  2. 词语存入字典,根据次数对词语进行过滤,并统计次数

  3. 实现文本转数字序列的方法

  4. 实现数字序列转文本方法(其实该任务中用不到这个方法)

# Word2Sequence
class Word2Sequence:
    # 未出现过的词
    UNK_TAG = "UNK"
    PAD_TAG = "PAD"
    # 填充的词
    UNK = 0
    PAD = 1

    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}

    def to_index(self, word):
        """word -> index"""
        return self.dict.get(word, self.UNK)

    def to_word(self, index):
        """index -> word"""
        if index in self.inversed_dict:
            return self.inversed_dict[index]
        return self.UNK_TAG

    def __len__(self):
        return len(self.dict)

    def fit(self, sentence):
        """count字典中存储每个单词出现的次数"""
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1

    def build_vocab(self, min_count=None, max_count=None, max_feature=None):
        """
        构建词典
        只筛选出现次数在[min_count,max_count]之间的词
        词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
        """
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}

        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}

        if max_feature is not None:
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
        # 给词典中每个词分配一个数字ID
        for word in self.count:
            self.dict[word] = len(self.dict)
        # 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
        self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        """
        根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
        max_len:统一文本的单词个数
        """
        if max_len is not None:
            r = [self.PAD] * max_len
        else:
            r = [self.PAD] * len(sentence)
        # 截断文本
        if max_len is not None and len(sentence) > max_len:
            sentence = sentence[:max_len]
        for index, word in enumerate(sentence):
            r[index] = self.to_index(word)
        return np.array(r, dtype=np.int64)

    def inverse_transform(self, indices):
        """数字序列-->单词序列"""
        sentence = []
        for i in indices:
            word = self.to_word(i)
            sentence.append(word)
        return sentence

定义完这个类之后,可以简单测试一下效果:

    # 测试Word2Sequence
    w2s = Word2Sequence()
    voc = [["你", "好", "么"],
           ["你", "好", "哦"]]
    for i in voc:
        w2s.fit(i)
    w2s.build_vocab()
    print(w2s.dict)
    print(w2s.transform(["你", "好", "嘛"]))

结果如下:

{'UNK': 0, 'PAD': 1, '你': 2, '好': 3, '么': 4, '哦': 5}
[2 3 0]

功能经测试正确,那么我们就可以用训练文本构建词典了,注意不能读取测试样本

# 建立词表
def fit_save_word_sequence():
    word_to_sequence = Word2Sequence()
    train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
    # total_file_path_list存储总的需要读取的txt文件
    total_file_path_list = []
    for i in train_path:
        total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
    # tqdm是显示进度条的
    for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
        word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
    word_to_sequence.build_vocab()
    # 对wordSequesnce进行保存
    pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))

执行该方法:

    fit_save_word_sequence()

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

结果会生成如下文件,注意,model文件夹需要事先创建!

pytorch实现IMDB数据集情感分类(全连接层的网络、LSTM)

文件生成成功后,可以使用如下的代码加载,ws就是Word2Sequence类的一个实例,在后续工作中我们会用到。

ws = pickle.load(open("./model/ws.pkl", "rb"))

 

四、构建模型

4.1 仅有全连接层

class IMDBModel(nn.Module):
    def __init__(self):
        # 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
        super(IMDBModel, self).__init__()
        # nn.Embedding方法参数:
        # len(ws):词典的总的词的数量。
        # 300:词向量的维度,即embedding dim
        # padding_idx,填充词
        self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
        self.fc1 = nn.Linear(max_len * 300, 128)
        self.fc = nn.Linear(128, 10)

    def forward(self, x):
        embed = self.embedding(x)
        embed = embed.view(x.size(0), -1)
        out = self.fc1(embed)
        out = F.relu(out)
        out = self.fc(out)
        return F.log_softmax(out, dim=-1)

 

4.2 LSTM

关于pytorch中LSTM api的使用,已经输入输出的理解,可以参考我之前写的博客:

万字长文:深入理解各类型神经网络(简单神经网络,CNN,LSTM)的输入和输出

class IMDBModel(nn.Module):
    def __init__(self):
        super(IMDBModel, self).__init__()
        self.hidden_size = 64
        self.embedding_dim = 200
        self.num_layer = 2
        self.bidirectional = True
        self.bi_num = 2 if self.bidirectional else 1
        self.dropout = 0.5
        # 以上部分为超参数,可以自行修改
        self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
        self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
                            self.num_layer, bidirectional=True, dropout=self.dropout)
        self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
        self.fc2 = nn.Linear(20, 10)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)  # 进行轴交换
        h_0, c_0 = self.init_hidden_state(x.size(1))
        _, (h_n, c_n) = self.lstm(x, (h_0, c_0))
        # 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
        out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
        out = self.fc(out)
        out = F.relu(out)
        out = self.fc2(out)
        return F.log_softmax(out, dim=-1)

    def init_hidden_state(self, batch_size):
        h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
        c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
        return h_0, c_0

4.3 训练和测试

首先指定一些超参数,放在程序的最前面:

train_batch_size = 512
test_batch_size = 500
max_len = 50

训练和测试:

def train(epoch):
    mode = True
    train_dataloader = get_dataloader(mode)
    for idx, (target, input) in enumerate(train_dataloader):
        optimizer.zero_grad()
        output = imdb_model(input)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, idx * len(input), len(train_dataloader.dataset),
                       100. * idx / len(train_dataloader), loss.item()))
            torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
            torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')


def test():
    test_loss = 0
    correct = 0
    mode = False
    imdb_model.eval()
    test_dataloader = get_dataloader(mode)
    with torch.no_grad():
        for target, input in test_dataloader:
            output = imdb_model(input)
            test_loss += F.nll_loss(output, target, reduction="sum")
            pred = torch.max(output, dim=-1, keepdim=False)[-1]
            correct += pred.eq(target.data).sum()
        test_loss = test_loss / len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))
if __name__ == '__main__':
    # # 测试数据集的功能
    # dataset = ImdbDataset(mode="train")
    # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    # for idx, (label, text) in enumerate(dataloader):
    #     print("idx:", idx)
    #     print("lable:", label)
    #     print("text:", text)
    #     break

    # 测试Word2Sequence
    # w2s = Word2Sequence()
    # voc = [["你", "好", "么"],
    #        ["你", "好", "哦"]]
    # for i in voc:
    #     w2s.fit(i)
    # w2s.build_vocab()
    # print(w2s.dict)
    # print(w2s.transform(["你", "好", "嘛"]))
    fit_save_word_sequence()

    # 训练和测试
    test()
    for i in range(3):
        train(i)
        print(
            "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
                i + 1))
        test()

其中两个不同网络结构训练和测试的代码是相同的,不过在LSTM网络中,有一处参数的变化

train_batch_size = 64

五、完整代码

需要特别注意的是,中间有一行代码:

ws = pickle.load(open("./model/ws.pkl", "rb"))

这行代码得建立词库才能正常执行,也就是必须有这个文件才能不报错,所以在训练和测试之前,先执行:

fit_save_word_sequence()

5.1 全连接层实现分类完整代码

import torch
from torch.utils.data import DataLoader, Dataset
import os
import re
import numpy as np
import pickle
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
                 r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
                 r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'

train_batch_size = 512
test_batch_size = 500
max_len = 50


# 分词的API
def tokenize(text):
    # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
    fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
                '“', ]
    text = re.sub("<.*?>", " ", text, flags=re.S)
    text = re.sub("|".join(fileters), " ", text, flags=re.S)
    return [i.strip() for i in text.split()]


# 自定义的数据集
class ImdbDataset(Dataset):
    def __init__(self, mode):
        super(ImdbDataset, self).__init__()
        if mode == "train":
            text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
        else:
            text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]

        self.total_file_path_list = []
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
        # print(self.total_file_path_list)

    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
        cur_filename = os.path.basename(cur_path)
        label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
        text = tokenize(open(cur_path, encoding="utf-8").read().strip())
        return label, text

    def __len__(self):
        return len(self.total_file_path_list)


# 自定义的collate_fn方法
def collate_fn(batch):
    # 手动zip操作,并转换为list,否则无法获取文本和标签了
    batch = list(zip(*batch))
    labels = torch.tensor(batch[0], dtype=torch.int32)
    texts = batch[1]
    texts = torch.tensor([ws.transform(i, max_len) for i in texts])
    del batch
    # 注意这里long()不可少,否则会报错
    return labels.long(), texts.long()


# 获取数据的方法
def get_dataloader(train=True):
    if train:
        mode = 'train'
    else:
        mode = "test"
    dataset = ImdbDataset(mode)
    batch_size = train_batch_size if train else test_batch_size
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)


# Word2Sequence
class Word2Sequence:
    # 未出现过的词
    UNK_TAG = "UNK"
    PAD_TAG = "PAD"
    # 填充的词
    UNK = 0
    PAD = 1

    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.count = {}

    def to_index(self, word):
        """word -> index"""
        return self.dict.get(word, self.UNK)

    def to_word(self, index):
        """index -> word"""
        if index in self.inversed_dict:
            return self.inversed_dict[index]
        return self.UNK_TAG

    def __len__(self):
        return len(self.dict)

    def fit(self, sentence):
        """count字典中存储每个单词出现的次数"""
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1

    def build_vocab(self, min_count=None, max_count=None, max_feature=None):
        """
        构建词典
        只筛选出现次数在[min_count,max_count]之间的词
        词典最大的容纳的词为max_feature,按照出现次数降序排序,要是max_feature有规定,出现频率很低的词就被舍弃了
        """
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}

        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}

        if max_feature is not None:
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])
        # 给词典中每个词分配一个数字ID
        for word in self.count:
            self.dict[word] = len(self.dict)
        # 构建一个数字映射到单词的词典,方法反向转换,但程序中用不太到
        self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        """
        根据词典给每个词分配的数字ID,将给定的sentence(字符串序列)转换为数字序列
        max_len:统一文本的单词个数
        """
        if max_len is not None:
            r = [self.PAD] * max_len
        else:
            r = [self.PAD] * len(sentence)
        # 截断文本
        if max_len is not None and len(sentence) > max_len:
            sentence = sentence[:max_len]
        for index, word in enumerate(sentence):
            r[index] = self.to_index(word)
        return np.array(r, dtype=np.int64)

    def inverse_transform(self, indices):
        """数字序列-->单词序列"""
        sentence = []
        for i in indices:
            word = self.to_word(i)
            sentence.append(word)
        return sentence


# 建立词表
def fit_save_word_sequence():
    word_to_sequence = Word2Sequence()
    train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
    # total_file_path_list存储总的需要读取的txt文件
    total_file_path_list = []
    for i in train_path:
        total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
    # tqdm是显示进度条的
    for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
        word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
    word_to_sequence.build_vocab()
    # 对wordSequesnce进行保存
    pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))


ws = pickle.load(open("./model/ws.pkl", "rb"))


# print(len(ws))
# 模型
class IMDBModel(nn.Module):
    def __init__(self):
        # 定义了两个全连接层,其中最后一个全连接层使用了softmax激活函数,并将输入的维度转换为10,实现10分类
        super(IMDBModel, self).__init__()
        # nn.Embedding方法参数:
        # len(ws):词典的总的词的数量。
        # 300:词向量的维度,即embedding dim
        # padding_idx,填充词
        self.embedding = nn.Embedding(len(ws), 300, padding_idx=ws.PAD)
        self.fc1 = nn.Linear(max_len * 300, 128)
        self.fc = nn.Linear(128, 10)

    def forward(self, x):
        embed = self.embedding(x)
        embed = embed.view(x.size(0), -1)
        out = self.fc1(embed)
        out = F.relu(out)
        out = self.fc(out)
        return F.log_softmax(out, dim=-1)


# 实例化
imdb_model = IMDBModel()
# 优化器
optimizer = optim.Adam(imdb_model.parameters())
# 交叉熵损失
criterion = nn.CrossEntropyLoss()


def train(epoch):
    mode = True
    train_dataloader = get_dataloader(mode)
    for idx, (target, input) in enumerate(train_dataloader):
        optimizer.zero_grad()
        output = imdb_model(input)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, idx * len(input), len(train_dataloader.dataset),
                       100. * idx / len(train_dataloader), loss.item()))
            torch.save(imdb_model.state_dict(), "model/mnist_net.pkl")
            torch.save(optimizer.state_dict(), 'model/mnist_optimizer.pkl')


def test():
    test_loss = 0
    correct = 0
    mode = False
    imdb_model.eval()
    test_dataloader = get_dataloader(mode)
    with torch.no_grad():
        for target, input in test_dataloader:
            output = imdb_model(input)
            test_loss += F.nll_loss(output, target, reduction="sum")
            pred = torch.max(output, dim=-1, keepdim=False)[-1]
            correct += pred.eq(target.data).sum()
        test_loss = test_loss / len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))


if __name__ == '__main__':
    # # 测试数据集的功能
    # dataset = ImdbDataset(mode="train")
    # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    # for idx, (label, text) in enumerate(dataloader):
    #     print("idx:", idx)
    #     print("lable:", label)
    #     print("text:", text)
    #     break

    # 测试Word2Sequence
    # w2s = Word2Sequence()
    # voc = [["你", "好", "么"],
    #        ["你", "好", "哦"]]
    # for i in voc:
    #     w2s.fit(i)
    # w2s.build_vocab()
    # print(w2s.dict)
    # print(w2s.transform(["你", "好", "嘛"]))
    # fit_save_word_sequence()

    # 训练和测试
    test()
    for i in range(3):
        train(i)
        print(
            "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
                i + 1))
        test()

5.2 LSTM分类完整代码

其中有些注释参照上面的程序,两段代码只有模型定义和一个train_batch_size不相同

import torch
from torch.utils.data import DataLoader, Dataset
import os
import re
import numpy as np
import pickle
from tqdm import tqdm
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

data_base_path = r'E:\2020.1.16\BaiduNetdiskDownload\python5.0\课件资' \
                 r'料V5.0解压密码:www.hoh0.com\课件资料V5.0\阶段9-人工智' \
                 r'能NLP项目\第四天\代码\data\aclImdb_v1\aclImdb'

train_batch_size = 64
test_batch_size = 500
max_len = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 分词的API
def tokenize(text):
    # fileters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
    fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
                '“', ]
    text = re.sub("<.*?>", " ", text, flags=re.S)
    text = re.sub("|".join(fileters), " ", text, flags=re.S)
    return [i.strip() for i in text.split()]


# 自定义的数据集
class ImdbDataset(Dataset):
    def __init__(self, mode):
        super(ImdbDataset, self).__init__()
        if mode == "train":
            text_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
        else:
            text_path = [os.path.join(data_base_path, i) for i in ["test/neg", "test/pos"]]

        self.total_file_path_list = []
        for i in text_path:
            self.total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
        # print(self.total_file_path_list)

    def __getitem__(self, idx):
        cur_path = self.total_file_path_list[idx]
        cur_filename = os.path.basename(cur_path)
        label = int(cur_filename.split("_")[-1].split(".")[0]) - 1
        text = tokenize(open(cur_path, encoding="utf-8").read().strip())
        return label, text

    def __len__(self):
        return len(self.total_file_path_list)


# 自定义的collate_fn方法
def collate_fn(batch):
    batch = list(zip(*batch))
    labels = torch.tensor(batch[0], dtype=torch.int32)
    texts = batch[1]
    texts = torch.tensor([ws.transform(i, max_len) for i in texts])
    del batch
    return labels.long(), texts.long()


# 获取数据的方法
def get_dataloader(train=True):
    if train:
        mode = 'train'
    else:
        mode = "test"
    dataset = ImdbDataset(mode)
    batch_size = train_batch_size if train else test_batch_size
    return DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)


# Word2Sequence
class Word2Sequence:
    UNK_TAG = "UNK"
    PAD_TAG = "PAD"
    UNK = 0
    PAD = 1

    def __init__(self):
        self.dict = {
            self.UNK_TAG: self.UNK,
            self.PAD_TAG: self.PAD
        }
        self.fited = False
        self.count = {}

    def to_index(self, word):
        return self.dict.get(word, self.UNK)

    def to_word(self, index):
        if index in self.inversed_dict:
            return self.inversed_dict[index]
        return self.UNK_TAG

    def __len__(self):
        return len(self.dict)

    def fit(self, sentence):
        for word in sentence:
            self.count[word] = self.count.get(word, 0) + 1

    def build_vocab(self, min_count=None, max_count=None, max_feature=None):
        if min_count is not None:
            self.count = {word: count for word, count in self.count.items() if count >= min_count}

        if max_count is not None:
            self.count = {word: count for word, count in self.count.items() if count <= max_count}

        if max_feature is not None:
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_feature])

        for word in self.count:
            self.dict[word] = len(self.dict)

        self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))

    def transform(self, sentence, max_len=None):
        if max_len is not None:
            r = [self.PAD] * max_len
        else:
            r = [self.PAD] * len(sentence)
        if max_len is not None and len(sentence) > max_len:
            sentence = sentence[:max_len]
        for index, word in enumerate(sentence):
            r[index] = self.to_index(word)
        return np.array(r, dtype=np.int64)

    def inverse_transform(self, indices):
        sentence = []
        for i in indices:
            word = self.to_word(i)
            sentence.append(word)
        return sentence


# 建立词表
def fit_save_word_sequence():
    word_to_sequence = Word2Sequence()
    train_path = [os.path.join(data_base_path, i) for i in ["train/neg", "train/pos"]]
    total_file_path_list = []
    for i in train_path:
        total_file_path_list.extend([os.path.join(i, j) for j in os.listdir(i)])
    for cur_path in tqdm(total_file_path_list, ascii=True, desc="fitting"):
        word_to_sequence.fit(tokenize(open(cur_path, encoding="utf-8").read().strip()))
    word_to_sequence.build_vocab()
    pickle.dump(word_to_sequence, open("model/ws.pkl", "wb"))


ws = pickle.load(open("./model/ws.pkl", "rb"))


# print(len(ws))
# 模型
class IMDBModel(nn.Module):
    def __init__(self):
        super(IMDBModel, self).__init__()
        self.hidden_size = 64
        self.embedding_dim = 200
        self.num_layer = 2
        self.bidirectional = True
        self.bi_num = 2 if self.bidirectional else 1
        self.dropout = 0.5
        # 以上部分为超参数,可以自行修改
        self.embedding = nn.Embedding(len(ws), self.embedding_dim, padding_idx=ws.PAD)
        self.lstm = nn.LSTM(self.embedding_dim, self.hidden_size,
                            self.num_layer, bidirectional=True, dropout=self.dropout)
        self.fc = nn.Linear(self.hidden_size * self.bi_num, 20)
        self.fc2 = nn.Linear(20, 10)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)  # 进行轴交换
        h_0, c_0 = self.init_hidden_state(x.size(1))
        _, (h_n, c_n) = self.lstm(x, (h_0, c_0))
        # 只要最后一个lstm单元处理的结果,取前向LSTM和后向LSTM的结果进行简单拼接
        out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
        out = self.fc(out)
        out = F.relu(out)
        out = self.fc2(out)
        return F.log_softmax(out, dim=-1)

    def init_hidden_state(self, batch_size):
        h_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
        c_0 = torch.rand(self.num_layer * self.bi_num, batch_size, self.hidden_size).to(device)
        return h_0, c_0


imdb_model = IMDBModel()
optimizer = optim.Adam(imdb_model.parameters())
criterion = nn.CrossEntropyLoss()


def train(epoch):
    mode = True
    train_dataloader = get_dataloader(mode)
    for idx, (target, input) in enumerate(train_dataloader):
        optimizer.zero_grad()
        output = imdb_model(input)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if idx % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, idx * len(input), len(train_dataloader.dataset),
                       100. * idx / len(train_dataloader), loss.item()))
            torch.save(imdb_model.state_dict(), "model/mnist_net_lstm.pkl")
            torch.save(optimizer.state_dict(), 'model/mnist_optimizer_lstm.pkl')


def test():
    test_loss = 0
    correct = 0
    mode = False
    imdb_model.eval()
    test_dataloader = get_dataloader(mode)
    with torch.no_grad():
        for target, input in test_dataloader:
            output = imdb_model(input)
            test_loss += F.nll_loss(output, target, reduction="sum")
            pred = torch.max(output, dim=-1, keepdim=False)[-1]
            correct += pred.eq(target.data).sum()
        test_loss = test_loss / len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))


if __name__ == '__main__':
    # # 测试数据集的功能
    # dataset = ImdbDataset(mode="train")
    # dataloader = DataLoader(dataset=dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
    # for idx, (label, text) in enumerate(dataloader):
    #     print("idx:", idx)
    #     print("lable:", label)
    #     print("text:", text)
    #     break

    # 测试Word2Sequence
    # fit_save_word_sequence()
    # w2s = Word2Sequence()
    # voc = [["你", "好", "么"],
    #        ["你", "好", "哦"]]
    # for i in voc:
    #     w2s.fit(i)
    # w2s.build_vocab()
    # print(w2s.dict)
    # print(w2s.transform(["你", "好", "嘛"]))

    # 训练和测试
    test()
    for i in range(3):
        train(i)
        print(
            "训练第{}轮的测试结果-----------------------------------------------------------------------------------------".format(
                i + 1))
        test()

5.3 测试结果

以下是在我的电脑中,仅仅只使用CPU的情况下跑得的结果,训练和测试都比较慢,其中LSTM的效果不理想,可能是数据集不足以支撑起10分类。

1.没有训练,直接测试:精确率大约在15%左右

2.全连接神经网络:精确率大约在25%左右

3.LSTM:精确率大约在35%左右

结果仅供参考

 

 

上一篇:CV学习笔记(二十二):CRNN+CTC


下一篇:LSTM的梯度、学习率、前项传播等