使用keras_bert调用bert的简单方法

mark一下使用keras_bert调用bert的简单方法,来源于某位大佬,在此表示感谢(找不到来源之处了)。

import json
import numpy as np
import pandas as pd
from keras_bert import load_trained_model_from_checkpoint, Tokenizer, load_vocabulary  # 超参数
from keras.layers import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam

maxlen = 100
batch_size = 8
droup_out_rate = 0.5
learning_rate = 1e-5
epochs = 1
#
path_prefix = "E:/NlpExternalPkg"
# # 预训练模型目录
config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json"
checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt"
dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"

bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, trainable=False)
x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))
x = bert_model([x1_in, x2_in])
x = Lambda(lambda x: x[:, 0])(x)  # 取出[CLS]对应的向量用来做分类,LCS在第一位,然后才依次是其它序列的向量输出
x = Dropout(0.2)(x)
p = Dense(1, activation='sigmoid')(x)
model = Model(inputs=[x1_in, x2_in], outputs=p)
model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(0.05),
    metrics=['accuracy']
)
model.summary()

data = []
# 读取数据
read_data = open("./data/sentiment.test.data", encoding="utf-8").readlines()
for i in read_data:
    a = i.replace("\n", "").split("\t")
    if a[1] == "1":
        data.append((a[0], 1))
    else:
        data.append((a[0], 0))

# 读取字典
token_dict = load_vocabulary(dict_path)  # {word:id,word1:id1}
# 建立分词器
tokenizer = Tokenizer(token_dict)
#
# # 拆分单词实例
# # ids = tokenizer.tokenize("unaffable") # ['[CLS]', 'u', '##na', '##ff', '##able', '[SEP]']
# # a,b = tokenizer.encode(first="我不喜欢你",second="你也不喜欢我")
# # a:word对应的id.
# # b:segment对应的id
#
# # 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]


def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:
    def __init__(self, data, batch_size=batch_size):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []
                    # 相当于是batch_token_id, batch_segment_id, batch_labels


# train_D = data_generator(train_data)
# valid_D = data_generator(valid_data)


def get_data(data):
    idxs = list(range(len(data)))
    np.random.shuffle(idxs)
    X1, X2, Y = [], [], []
    for i in idxs:
        d = data[i]
        text = d[0]
        x1, x2 = tokenizer.encode(first=text)
        y = d[1]
        x1 = np.concatenate([x1, [0] * (100 - len(x1))]) if len(x1) < 100 else x1[:100]
        x2 = np.concatenate([x2, [0] * (100 - len(x2))]) if len(x2) < 100 else x2[:100]
        # y = np.concatenate([y, [0] * (100 - len(y))]) if len(y) < 100 else y[:100]

        X1.append(x1)
        X2.append(x2)
        Y.append(y)
    return X1, X2, Y


X1, X2, Y = get_data(train_data)
print(Y)
model.fit([X1, X2], Y, batch_size=8,epochs=5)

# model.fit_generator(
#     train_D.__iter__(),
#     steps_per_epoch=len(train_D),
#     epochs=epochs,
#     validation_data=valid_D.__iter__(),
#     validation_steps=len(valid_D)
# )
# model.save("new.model")
#
# for i in range(5):
#     A, B = [], []
#     a, b = tokenizer.encode(first=data[i][0])
#     A.append(a)
#     B.append(b)
#     A = seq_padding(A)
#     B = seq_padding(B)
#     print(A)
#     print(B)
#     a = model.predict(x=[A, B])
#     print(a, data[i][1])
#     print("------------------------------")
#
# # pred = model.predict()
# # print(pred)

上一篇:中文纠错(Chinese Spelling Correct)最新技术方案总结


下一篇:疯壳Android嵌入式Linux平板开发教程3-7摄像头