TFRS之信息检索

数据源: Movielens dataset

import os
import pprint
import tempfile
# 类型检查,防止运行时出现参数和返回值类型不符合。传入参数参数名:类型,通过 -> 结果类型
from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

加载数据集

# ratings data
ratings = tfds.load('movielens/100k-ratings',split='train')
# Features of all the available movies
movies = tfds.load('movielens/100k-movies',split='train')

查看评分数据集信息

for  x in ratings.take(1).as_numpy_iterator():
	pprint.pprint(x)

模型特征数据

ratings = ratings.map(lambda x :{
	'movie_title': x['movie_title'],
	'user_id': x['user_id']
})
movies = movies.map(lambda x: x['movie_title'])

数据集划分:工业界需要根据时间进行切分,这里随机切分。

tf.random.set_seed(42)
shuffled = ratings.shhuffle(100_000,seed =42,reshuffle_each_iteration =False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

构建词汇表

movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000_000).map(lambda x:x['user_id'])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

unique_movie_titles[:10]

搭建双塔模型

# 定义向量维度
embedding_dimension = 32
# 查询塔
user_model = tf.keras.Sequential([
	tf.keras.layers.StringLookup(
		vocabulary = unique_user_ids, mask_token = None),
	# we add an additional embedding to account for unknown tokens
	tf.keras.layers.Embedding(len(unique_user_ids)+1,embedding_dimension)
])

# 候选塔
movie_model = tf.keras.Sequential([
	tf.keras.layers.StringLookup(vocabulary = unique_movie_titles, mask_token =None),
	tf.keras.layers.Embedding(len(unique_movie_titles)+1,embedding_dimension)
])

可以任意扩展模型复杂度,只需要最后返回各自的embedding

效果评估

思路:模型对正样本( user,movie)pairs的评分高于其他候选集的得分,那么模型就具有很高的准确度。

metrics = tfrs.metrics.FactorizedTopK(
	candidates = movies.batch(128).map(movie_model)
)

损失函数

TFRS中包装了几个损失函数,对于检索阶段,使用Retrieval函数

task = tfrs.tasks.Retrieval(
	metrics = metrics
)

task本身是一个Keras层,它将查询和候选嵌入作为参数,并返回计算的损失:我们将使用它来实现模型的训练循环。

完整模型

class MovielensModel(tfrs.Model):
	def __init__(self,user_model, movie_model):
		super().__init__()
		self.movie_model:tf.keras.Model = movie_model
		self.user_model: tf.keras.Model = user_model
		self.task: tf.keras.layers.Layer =task
	
	def compute_loss(self,features:Dict[Text,tf.Tensor], training= False) -> tf.Tensor:
		# we pick out the user features and pass them into the user model
		user_embeddings = self.user_model(features['user_id'])
		# And pick out the movie features and pass them into the movie model,getting embeddings back
		positive_movie_embeddings = self.movie_model(features['movie_title'])
		
		# The task computes the loss and the metrics
		return self.task(user_embeddings, positive_movie_embeddings)
	

也可以通过继承tf.keras.Model 并且重写train_step和test_step来实现

class NoBaseClassMovielensModel(tf.keras.Model):
	def __init__(self, user_model, movie_model):
		super().__init__()
		self.movie_model: tf.keras.Model = movie_model
		self.user_model: tf.keras.Model = user_model
		self.task: tf.keras.layers.Layer = task
	
	def train_step(self,feature:Dict[Text,tf.Tensor])  -> tf.Tensor:
		# set up a gradient tape to record gradients
		with tf.GradientTape() as tape:
			# loss computation
			user_embeddings = self.user_model(features['user_id'])
			positive_movie_embeddings= self.movie_model(features['movie_title'])
			loss = self.task(user_embeddings,positive_movie_embeddings)
			# handle regularization losses as well
			regularization_loss =sum(self.losses)
			total_loss = loss + regularization_loss
		gradients = tape.gradient(total_loss, self.trainable_variables) 
		self.optimizer.apply_gradients(zip(gradients,self.trainable_variables))
	metrics = {metrics.name: metrics.result() for metirc in self.metrics}
	metircs['loss'] = loss
	metrics['regularization_loss'] = regularization_loss
	metrics['total_loss'] = total_loss
	return  metrics

def test_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
	
	# loss computation
	user_embeddings = self.user_model(features['user_id'])
	positive_movie_embedding = self.movie_model(features['movie_title'])
	loss = self.task(user_embeddings, positive_movie_embeddings)
	
	# Handle regularization losses as well 
	regularization_loss = sum(self.losses)
	total_loss = loss + regularization_loss
	
	metrics = {metrics.name:metrics.result() for metric in self.metrics}
	metrics['loss'] = loss
	metrics['regularization_loss'] = regularization_loss
	metrics['total_loss'] = total_loss
	return metrics

模型训练和评估

# 实例化模型
model = MovielensModel(user_model, movie_model)
model.compile(optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1))

对训练集、测试集进行打乱、批次化、缓存处理

cached_train = train.shuffle(100_000).batch(8192).cache()

cache_test = test.batch(4096).cache()

模型训练

model.fit(cache_train, epochs=3)

# 模型测试
model.evaluate(cache_test, return_dict =True)

模型预测

# create a model that takes in raw query features,and recommends movies out of the entire movies dataset
index = tfrs.layers.factorized_top_K.BruteForce(model.user_model)
index.index_from_dataset(
	tf.data.Dataset.zip((movies.batch(100),movies.batch(100).map(model.movie_model)))
)

# get recommendations
_,titles = index(tf.constant(['42']))
print(f'Recommendations for user 42: {titles[0,:3]}')

模型服务

要部署这样的模型,我们只需导出我们在上面创建的BruteForce

# Export the query model
with tmpfile.TemporaryDirectory() as tmp:
	path = os.path.join(tmp,'model')
	
	# Save the index
	tf.saved_model.save(index,path)
	
	# load it back, can also be done in TensorFlow Serving
	loaded = tf.saved_model.load(path)
	
	# pass a user id in , get top predicted movie titles back
	scores,titled = loaded(['42'])
	
	print(f'Recommendations: {titles[0][:3]}')

使用scann package 加速查询速度,下面我们使用tfrs中的ScaNN 层加速查询:

scann_index =tfrs.layers.factorized_top_k.ScaNN(model.user_model)

scann_index.index_from_dataset(
	tf.data.Dataset.zip((movies.batch(100), movies.batch(100).map(model.movie_model)))
)

这一层将执行近似查找:这使得检索稍微不那么精确,但在大型候选集上要快几个数量级。

# Get recommendations.
_, titles = scann_index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :3]}")

导出它以提供服务和导出BruteForce层一样简单:

# Export the query model.
with tempfile.TemporaryDirectory() as tmp:
  path = os.path.join(tmp, "model")

  # Save the index.
  tf.saved_model.save(
      index,
      path,
      options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
  )

  # Load it back; can also be done in TensorFlow Serving.
  loaded = tf.saved_model.load(path)

  # Pass a user id in, get top predicted movie titles back.
  scores, titles = loaded(["42"])

  print(f"Recommendations: {titles[0][:3]}")
上一篇:TFRS之特征预处理


下一篇:我奶奶看完都会操作的Git团队项目协作攻略