一、主要原理
连续词袋模型(CBOW,Continuous Bag-of-Words Model)假设中心词是由文本序列的上下文生成;跳字模型(skip-gram)假设中心词生成该词在文本序列中的上下文。如下图所示。
二、代码实现
2.1 处理语料库数据。首先,读取语料库中的数据,并转换成字典序,让每个单词或字母对应数字,即v2i,让每个数字对应单词或字母,即i2v。最后,将每句话变成对应的数字,再将语料库中选择一定数量的句子进行训练。
点击查看Python代码处理语料库信息
all_words = [sentence.split(" ") for sentence in corpus]
all_words = np.array(list(itertools.chain(*all_words)))
# vocab sort by decreasing frequency for the negative sampling below (nce_loss).
vocab, v_count = np.unique(all_words, return_counts=True)
vocab = vocab[np.argsort(v_count)[::-1]]
print("all vocabularies sorted from more frequent to less frequent:\n", vocab)
v2i = {v: i for i, v in enumerate(vocab)}
i2v = {i: v for v, i in v2i.items()}
# pair data
pairs = []
js = [i for i in range(-skip_window, skip_window + 1) if i != 0]
for c in corpus:
words = c.split(" ")
w_idx = [v2i[w] for w in words]
if method == "skip_gram":
for i in range(len(w_idx)):
for j in js:
if i + j < 0 or i + j >= len(w_idx):
continue
pairs.append((w_idx[i], w_idx[i + j])) # (center, context) or (feature, target)
elif method.lower() == "cbow":
for i in range(skip_window, len(w_idx) - skip_window):
context = []
for j in js:
context.append(w_idx[i + j])
pairs.append(context + [w_idx[i]]) # (contexts, center) or (feature, target)
else:
raise ValueError
pairs = np.array(pairs)
print("5 example pairs:\n", pairs[:5])
if method.lower() == "skip_gram":
x, y = pairs[:, 0], pairs[:, 1]
elif method.lower() == "cbow":
x, y = pairs[:, :-1], pairs[:, -1]
else:
raise ValueError
2.2 定义CBOW模型。使用tensorflow中的keras进行相关API的调用。
点击查看CBOW的Python代码
class CBOW(keras.Model):
def __init__(self, v_dim, emb_dim):
super().__init__()
self.v_dim = v_dim
self.embeddings = keras.layers.Embedding(
input_dim=v_dim, output_dim=emb_dim, # [n_vocab, emb_dim]
embeddings_initializer=keras.initializers.RandomNormal(1., 1.1),
)
# noise-contrastive estimation
self.nce_w = self.add_weight(
name="nce_w", shape=[v_dim, emb_dim],
initializer=keras.initializers.TruncatedNormal(0., 0.1)) # [n_vocab, emb_dim]
self.nce_b = self.add_weight(
name="nce_b", shape=(v_dim,),
initializer=keras.initializers.Constant(0.1)) # [n_vocab, ]
self.opt = keras.optimizers.Adam(0.01)
def call(self, x, training=None, mask=None):
# x.shape = [n, skip_window*2]
o = self.embeddings(x) # [n, skip_window*2, emb_dim]
o = tf.reduce_mean(o, axis=1) # [n, emb_dim]
return o
# negative sampling: take one positive label and num_sampled negative labels to compute the loss
# in order to reduce the computation of full softmax
def loss(self, x, y, training=None):
embedded = self.call(x, training)
return tf.reduce_mean(
tf.nn.nce_loss(
weights=self.nce_w, biases=self.nce_b, labels=tf.expand_dims(y, axis=1),
inputs=embedded, num_sampled=5, num_classes=self.v_dim))
def step(self, x, y):
with tf.GradientTape() as tape:
loss = self.loss(x, y, True)
grads = tape.gradient(loss, self.trainable_variables)
self.opt.apply_gradients(zip(grads, self.trainable_variables))
return loss.numpy()
2.3 训练模型
点击查看代码
def train(model, data):
for t in range(2500):
bx, by = data.sample(8)
loss = model.step(bx, by)
if t % 200 == 0:
print("step: {} | loss: {}".format(t, loss))
参考文献:
[1] CBOW:https://arxiv.org/pdf/1301.3781.pdf
[2] Skip-gram:https://arxiv.org/pdf/1310.4546.pdf