keras bert 文本分类实战(学习)

1 .目标

  • 对给出的数据集,判断给出的text,属于什么类型。
    数据集:keras bert 文本分类实战(学习)

2.数据处理

将文本(text)和标签(label)转成计算机可以识别的数字。

  • 首先读取数据,将数据打乱
  • 对label :将label转化为数字对应的数字,并保存

例如:
keras bert 文本分类实战(学习)

  • 构建全部的数据集,变成 [(text1,lable1),(text2,lable2)…]的形式
    例如:
    keras bert 文本分类实战(学习)

  • 划分训练集和验证集

  • 接下来处理text部分,需要将text部分处理成为计算机可以识别的数字,根据bert给的词典进行映射到对应id
    (1) 根据bert给的词典建立 词典字典(词到id的映射)

# 将词表中的词编号转换为字典
tokenDict = {}
with codecs.open(vocabPath, 'r', encoding='utf-8') as reader:
    for line in reader:
        token = line.strip()  
        tokenDict[token] = len(tokenDict)

keras bert 文本分类实战(学习)(2) 对分词器进行重新编写
这是苏神的解读:

  • 本来 Tokenizer 有自己的 _tokenize 方法,我这里重写了这个方法,是要保证 tokenize 之后的结果,跟原来的字符串长度等长(如果算上两个标记,那么就是等长再加 2)。 Tokenizer 自带的 _tokenize 会自动去掉空格,然后有些字符会粘在一块输出,导致 tokenize 之后的列表不等于原来字符串的长度了,这样如果做序列标注的任务会很麻烦。主要就是用 [unused1] 来表示空格类字符,而其余的不在列表的字符用 [UNK] 表示,其中 [unused*] 这些标记是未经训练的(随即初始化),是 Bert 预留出来用来增量添加词汇的标记,所以我们可以用它们来指代任何新字符。
class OurTokenizer(Tokenizer):
    def _tokenize(self, content):
        reList = []
        for t in content:  # 对内容遍历
            if t in self._token_dict:
                reList.append(t)
            elif self._is_space(t):

                # 用[unused1]来表示空格类字符
                reList.append('[unused1]')
            else:
                # 不在列表的字符用[UNK]表示
                reList.append('[UNK]')
        return reList

(3)对text进行编码,使用 data_generator生成器逐批生成数据([X1,X2],Y),并且对数据进行padding=0,使一个batchsize中的句子长度等长。

def seqPadding(X, padding=0):   #充填
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X])

class data_generator:
    def __init__(self, data, batch_size=32, shuffle=True):
        self.data = data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))

            if self.shuffle:
                np.random.shuffle(idxs)

            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:   #等于一个bachsize或者 到了最后一条样本
                    X1 = seqPadding(X1)
                    X2 = seqPadding(X2)
                    Y = seqPadding(Y)
                    yield [X1, X2], Y  #yield就是 return 返回一个值,并且记住这个返回的位置,下次迭代就从这个位置后开始
                    [X1, X2, Y] = [], [], []

例如:
keras bert 文本分类实战(学习)

3.模型构建

  • 将text通过bert编码,取bert编码后cls(汇集了句子的语义信息),将其喂入一个线性层,输出为15个单元,并通过softmax得到对应的类别概率。
  • 简要图
    keras bert 文本分类实战(学习)
    模型构造和训练:
# bert模型设置
bert_model = load_trained_model_from_checkpoint(configPath, ckpPath, seq_len=None)  # 加载预训练模型

for l in bert_model.layers:
    l.trainable = True

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])

# 取出[CLS]对应的向量用来做分类
x = Lambda(lambda x: x[:, 0])(x)
p = Dense(15, activation='softmax')(x)

model = Model([x1_in, x2_in], p)
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'])
model.summary()

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

model.fit_generator(train_D.__iter__(), steps_per_epoch=len(train_D), epochs=5, validation_data=valid_D.__iter__(),
                    validation_steps=len(valid_D))

4.预测

#测试的数据集
str1 = "上港主场1-2负于国安,遭遇联赛两连败,上港到底输在哪?"
str2 = "普京总统会见了拜登总统"
str3 = "这3辆10万出头小钢炮,随便改改轻松秒奔驰,第一辆还是限量款"
predict_D = data_generator([(str1, 0), (str2, 3), (str3, 10)], shuffle=False)
#获取总的标签类别
#array(['体育', '军事', '农业', '国际', '娱乐', '房产', '教育', '文化', '旅游', '民生故事', '汽车','电竞游戏', '科技', '证券股票', '财经'], dtype=object)
output_label2id_file = os.path.join(mainPath, "model/keras_class/label2id.pkl")
if os.path.exists(output_label2id_file):
    with open(output_label2id_file, 'rb') as w:
        labes = pickle.load(w)

#加载保存的模型
from keras_bert import get_custom_objects
custom_objects = get_custom_objects() 
model = load_model(mainPath + 'model/keras_class/tnews.h5', custom_objects=custom_objects)
#使用生成器获取测试的数据
tmpData = predict_D.__iter__()
#预测
preds = model.predict_generator(tmpData, steps=len(predict_D), verbose=1)
# 求每行最大值得下标,其中,axis=1表示按行计算
index_maxs = np.argmax(preds, axis=1)
result = [(x, labes[x]) for x in index_maxs]
print(result)

预测结果:str1:体育,str2:国际,str3:汽车
keras bert 文本分类实战(学习)
下面是训练的总代码:

import pickle
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
from keras.layers import *
from keras.models import Model
from keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import shuffle
from keras.utils.vis_utils import plot_model
import codecs, gc
import keras.backend as K
import os
import pandas as pd
import numpy as np


# 文件主路径定义
mainPath = 'D:/project/建行杯/舆情舆论/keras_bert文本分类实例/'

# 从文件中读取数据,获取训练集和验证集
rc = pd.read_csv(mainPath + 'data/tnews/toutiao_news_dataset.txt', delimiter="_!_", names=['labels', 'text'],
                 header=None, encoding='utf-8')   #delimiter

rc = shuffle(rc)  # shuffle数据,打乱

# 把类别转换为数字
# 一共15个类别:"教育","科技","军事","旅游","国际","证券股票","农业","电竞游戏",
# "民生故事","文化","娱乐","体育","财经","房产","汽车"
class_le = LabelEncoder()
rc.iloc[:, 0] = class_le.fit_transform(rc.iloc[:, 0].values)

# 保存标签文件
output_label2id_file = os.path.join(mainPath, "model/keras_class/label2id.pkl")
if not os.path.exists(output_label2id_file):
    with open(output_label2id_file, 'wb') as w:
        pickle.dump(class_le.classes_, w)

# 构建全部所需数据集
data_list = []
for d in rc.iloc[:].itertuples():
    data_list.append((d.text, d.labels))

# 取一部分数据做训练和验证
train_data = data_list[0:20000]
valid_data = data_list[20000:22000]

maxlen = 100  # 设置序列长度为100,要保证序列长度不超过512

# 设置预训练模型
configPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_config.json'
ckpPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/bert_model.ckpt'
vocabPath = mainPath + 'chinese_roberta_wwm_ext_L-12_H-768_A-12/vocab.txt'

# 将词表中的词编号转换为字典
tokenDict = {}
with codecs.open(vocabPath, 'r', encoding='utf-8') as reader:
    for line in reader:
        token = line.strip()
        tokenDict[token] = len(tokenDict)


# 重写tokenizer
class OurTokenizer(Tokenizer):
    def _tokenize(self, content):
        reList = []
        for t in content:
            if t in self._token_dict:
                reList.append(t)
            elif self._is_space(t):

                # 用[unused1]来表示空格类字符
                reList.append('[unused1]')
            else:
                # 不在列表的字符用[UNK]表示
                reList.append('[UNK]')
        return reList


tokenizer = OurTokenizer(tokenDict)


def seqPadding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X])


class data_generator:  #先将数据变成元组的形式在喂入生成器
    def __init__(self, data, batch_size=32, shuffle=True):
        self.data = data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))

            if self.shuffle:
                np.random.shuffle(idxs)

            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seqPadding(X1)
                    X2 = seqPadding(X2)
                    Y = seqPadding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []


# bert模型设置
bert_model = load_trained_model_from_checkpoint(configPath, ckpPath, seq_len=None)  # 加载预训练模型

for l in bert_model.layers:
    l.trainable = True

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])

# 取出[CLS]对应的向量用来做分类
x = Lambda(lambda x: x[:, 0])(x)
p = Dense(15, activation='softmax')(x)

model = Model([x1_in, x2_in], p)
model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(1e-5), metrics=['accuracy'])
model.summary()

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

model.fit_generator(train_D.__iter__(), steps_per_epoch=len(train_D), epochs=5, validation_data=valid_D.__iter__(),
                    validation_steps=len(valid_D))

model.save(mainPath + 'model/keras_class/tnews.h5', True, True)

# 保存模型结构图
plot_model(model, to_file='model/keras_class/tnews.png', show_shapes=True)

del model

# 清理内存
gc.collect()

# clear_session就是清除一个session
K.clear_session()
上一篇:Feign设计原理


下一篇:远程调用服务