mark一下使用keras_bert调用bert的简单方法,来源于某位大佬,在此表示感谢(找不到来源之处了)。
import json
import numpy as np
import pandas as pd
from keras_bert import load_trained_model_from_checkpoint, Tokenizer, load_vocabulary # 超参数
from keras.layers import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam
maxlen = 100
batch_size = 8
droup_out_rate = 0.5
learning_rate = 1e-5
epochs = 1
#
path_prefix = "E:/NlpExternalPkg"
# # 预训练模型目录
config_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_config.json"
checkpoint_path = path_prefix + "/chinese_L-12_H-768_A-12/bert_model.ckpt"
dict_path = path_prefix + "/chinese_L-12_H-768_A-12/vocab.txt"
bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, trainable=False)
x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))
x = bert_model([x1_in, x2_in])
x = Lambda(lambda x: x[:, 0])(x) # 取出[CLS]对应的向量用来做分类,LCS在第一位,然后才依次是其它序列的向量输出
x = Dropout(0.2)(x)
p = Dense(1, activation='sigmoid')(x)
model = Model(inputs=[x1_in, x2_in], outputs=p)
model.compile(
loss='binary_crossentropy',
optimizer=Adam(0.05),
metrics=['accuracy']
)
model.summary()
data = []
# 读取数据
read_data = open("./data/sentiment.test.data", encoding="utf-8").readlines()
for i in read_data:
a = i.replace("\n", "").split("\t")
if a[1] == "1":
data.append((a[0], 1))
else:
data.append((a[0], 0))
# 读取字典
token_dict = load_vocabulary(dict_path) # {word:id,word1:id1}
# 建立分词器
tokenizer = Tokenizer(token_dict)
#
# # 拆分单词实例
# # ids = tokenizer.tokenize("unaffable") # ['[CLS]', 'u', '##na', '##ff', '##able', '[SEP]']
# # a,b = tokenizer.encode(first="我不喜欢你",second="你也不喜欢我")
# # a:word对应的id.
# # b:segment对应的id
#
# # 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
def seq_padding(X, padding=0):
L = [len(x) for x in X]
ML = max(L)
return np.array([
np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
])
class data_generator:
def __init__(self, data, batch_size=batch_size):
self.data = data
self.batch_size = batch_size
self.steps = len(self.data) // self.batch_size
if len(self.data) % self.batch_size != 0:
self.steps += 1
def __len__(self):
return self.steps
def __iter__(self):
while True:
idxs = list(range(len(self.data)))
np.random.shuffle(idxs)
X1, X2, Y = [], [], []
for i in idxs:
d = self.data[i]
text = d[0][:maxlen]
x1, x2 = tokenizer.encode(first=text)
y = d[1]
X1.append(x1)
X2.append(x2)
Y.append([y])
if len(X1) == self.batch_size or i == idxs[-1]:
X1 = seq_padding(X1)
X2 = seq_padding(X2)
Y = seq_padding(Y)
yield [X1, X2], Y
[X1, X2, Y] = [], [], []
# 相当于是batch_token_id, batch_segment_id, batch_labels
# train_D = data_generator(train_data)
# valid_D = data_generator(valid_data)
def get_data(data):
idxs = list(range(len(data)))
np.random.shuffle(idxs)
X1, X2, Y = [], [], []
for i in idxs:
d = data[i]
text = d[0]
x1, x2 = tokenizer.encode(first=text)
y = d[1]
x1 = np.concatenate([x1, [0] * (100 - len(x1))]) if len(x1) < 100 else x1[:100]
x2 = np.concatenate([x2, [0] * (100 - len(x2))]) if len(x2) < 100 else x2[:100]
# y = np.concatenate([y, [0] * (100 - len(y))]) if len(y) < 100 else y[:100]
X1.append(x1)
X2.append(x2)
Y.append(y)
return X1, X2, Y
X1, X2, Y = get_data(train_data)
print(Y)
model.fit([X1, X2], Y, batch_size=8,epochs=5)
# model.fit_generator(
# train_D.__iter__(),
# steps_per_epoch=len(train_D),
# epochs=epochs,
# validation_data=valid_D.__iter__(),
# validation_steps=len(valid_D)
# )
# model.save("new.model")
#
# for i in range(5):
# A, B = [], []
# a, b = tokenizer.encode(first=data[i][0])
# A.append(a)
# B.append(b)
# A = seq_padding(A)
# B = seq_padding(B)
# print(A)
# print(B)
# a = model.predict(x=[A, B])
# print(a, data[i][1])
# print("------------------------------")
#
# # pred = model.predict()
# # print(pred)