数据源: Movielens dataset
import os
import pprint
import tempfile
# 类型检查,防止运行时出现参数和返回值类型不符合。传入参数参数名:类型,通过 -> 结果类型
from typing import Dict, Text
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
加载数据集
# ratings data
ratings = tfds.load('movielens/100k-ratings',split='train')
# Features of all the available movies
movies = tfds.load('movielens/100k-movies',split='train')
查看评分数据集信息
for x in ratings.take(1).as_numpy_iterator():
pprint.pprint(x)
模型特征数据
ratings = ratings.map(lambda x :{
'movie_title': x['movie_title'],
'user_id': x['user_id']
})
movies = movies.map(lambda x: x['movie_title'])
数据集划分:工业界需要根据时间进行切分,这里随机切分。
tf.random.set_seed(42)
shuffled = ratings.shhuffle(100_000,seed =42,reshuffle_each_iteration =False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
构建词汇表
movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000_000).map(lambda x:x['user_id'])
unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))
unique_movie_titles[:10]
搭建双塔模型
# 定义向量维度
embedding_dimension = 32
# 查询塔
user_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(
vocabulary = unique_user_ids, mask_token = None),
# we add an additional embedding to account for unknown tokens
tf.keras.layers.Embedding(len(unique_user_ids)+1,embedding_dimension)
])
# 候选塔
movie_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(vocabulary = unique_movie_titles, mask_token =None),
tf.keras.layers.Embedding(len(unique_movie_titles)+1,embedding_dimension)
])
可以任意扩展模型复杂度,只需要最后返回各自的embedding
效果评估
思路:模型对正样本( user,movie)pairs的评分高于其他候选集的得分,那么模型就具有很高的准确度。
metrics = tfrs.metrics.FactorizedTopK(
candidates = movies.batch(128).map(movie_model)
)
损失函数
TFRS中包装了几个损失函数,对于检索阶段,使用Retrieval函数
task = tfrs.tasks.Retrieval(
metrics = metrics
)
task本身是一个Keras层,它将查询和候选嵌入作为参数,并返回计算的损失:我们将使用它来实现模型的训练循环。
完整模型
class MovielensModel(tfrs.Model):
def __init__(self,user_model, movie_model):
super().__init__()
self.movie_model:tf.keras.Model = movie_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer =task
def compute_loss(self,features:Dict[Text,tf.Tensor], training= False) -> tf.Tensor:
# we pick out the user features and pass them into the user model
user_embeddings = self.user_model(features['user_id'])
# And pick out the movie features and pass them into the movie model,getting embeddings back
positive_movie_embeddings = self.movie_model(features['movie_title'])
# The task computes the loss and the metrics
return self.task(user_embeddings, positive_movie_embeddings)
也可以通过继承tf.keras.Model 并且重写train_step和test_step来实现
class NoBaseClassMovielensModel(tf.keras.Model):
def __init__(self, user_model, movie_model):
super().__init__()
self.movie_model: tf.keras.Model = movie_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer = task
def train_step(self,feature:Dict[Text,tf.Tensor]) -> tf.Tensor:
# set up a gradient tape to record gradients
with tf.GradientTape() as tape:
# loss computation
user_embeddings = self.user_model(features['user_id'])
positive_movie_embeddings= self.movie_model(features['movie_title'])
loss = self.task(user_embeddings,positive_movie_embeddings)
# handle regularization losses as well
regularization_loss =sum(self.losses)
total_loss = loss + regularization_loss
gradients = tape.gradient(total_loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients,self.trainable_variables))
metrics = {metrics.name: metrics.result() for metirc in self.metrics}
metircs['loss'] = loss
metrics['regularization_loss'] = regularization_loss
metrics['total_loss'] = total_loss
return metrics
def test_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
# loss computation
user_embeddings = self.user_model(features['user_id'])
positive_movie_embedding = self.movie_model(features['movie_title'])
loss = self.task(user_embeddings, positive_movie_embeddings)
# Handle regularization losses as well
regularization_loss = sum(self.losses)
total_loss = loss + regularization_loss
metrics = {metrics.name:metrics.result() for metric in self.metrics}
metrics['loss'] = loss
metrics['regularization_loss'] = regularization_loss
metrics['total_loss'] = total_loss
return metrics
模型训练和评估
# 实例化模型
model = MovielensModel(user_model, movie_model)
model.compile(optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1))
对训练集、测试集进行打乱、批次化、缓存处理
cached_train = train.shuffle(100_000).batch(8192).cache()
cache_test = test.batch(4096).cache()
模型训练
model.fit(cache_train, epochs=3)
# 模型测试
model.evaluate(cache_test, return_dict =True)
模型预测
# create a model that takes in raw query features,and recommends movies out of the entire movies dataset
index = tfrs.layers.factorized_top_K.BruteForce(model.user_model)
index.index_from_dataset(
tf.data.Dataset.zip((movies.batch(100),movies.batch(100).map(model.movie_model)))
)
# get recommendations
_,titles = index(tf.constant(['42']))
print(f'Recommendations for user 42: {titles[0,:3]}')
模型服务
要部署这样的模型,我们只需导出我们在上面创建的BruteForce
层
# Export the query model
with tmpfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp,'model')
# Save the index
tf.saved_model.save(index,path)
# load it back, can also be done in TensorFlow Serving
loaded = tf.saved_model.load(path)
# pass a user id in , get top predicted movie titles back
scores,titled = loaded(['42'])
print(f'Recommendations: {titles[0][:3]}')
使用scann package 加速查询速度,下面我们使用tfrs中的ScaNN 层加速查询:
scann_index =tfrs.layers.factorized_top_k.ScaNN(model.user_model)
scann_index.index_from_dataset(
tf.data.Dataset.zip((movies.batch(100), movies.batch(100).map(model.movie_model)))
)
这一层将执行近似查找:这使得检索稍微不那么精确,但在大型候选集上要快几个数量级。
# Get recommendations.
_, titles = scann_index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :3]}")
导出它以提供服务和导出BruteForce层一样简单:
# Export the query model.
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "model")
# Save the index.
tf.saved_model.save(
index,
path,
options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
)
# Load it back; can also be done in TensorFlow Serving.
loaded = tf.saved_model.load(path)
# Pass a user id in, get top predicted movie titles back.
scores, titles = loaded(["42"])
print(f"Recommendations: {titles[0][:3]}")