import os
import pprint
import tempfile
from typing import Dict, Text
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
数据集
# Ratings data.
ratings = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies = tfds.load("movielens/100k-movies", split="train")
for x in ratings.take(1).as_numpy_iterator():
pprint.pprint(x)
for x in movies.take(1).as_numpy_iterator():
pprint.pprint(x)
ratings = ratings.map(lambda x: {
"movie_title": x["movie_title"],
"user_id": x["user_id"],
})
movies = movies.map(lambda x: x["movie_title"])
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])
unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))
unique_movie_titles[:10]
设置两个虚拟gpu
gpus = tf.config.list_physical_devices("GPU")
if gpus:
# Create 2 virtual GPUs with 1GB memory each
try:
tf.config.set_logical_device_configuration(
gpus[0],
[tf.config.LogicalDeviceConfiguration(memory_limit=1024),
tf.config.LogicalDeviceConfiguration(memory_limit=1024)])
logical_gpus = tf.config.list_logical_devices("GPU")
print(len(gpus), "Physical GPU,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Virtual devices must be set before GPUs have been initialized
print(e)
strategy = tf.distribute.MirroredStrategy()
我们把相关的模型包装在分布策略范围内:
embedding_dimension = 32
with strategy.scope():
user_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
])
movie_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(
vocabulary=unique_movie_titles, mask_token=None),
tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
])
metrics = tfrs.metrics.FactorizedTopK(
candidates=movies.batch(128).map(movie_model)
)
task = tfrs.tasks.Retrieval(
metrics=metrics
)
完整模型
class MovielensModel(tfrs.Model):
def __init__(self, user_model, movie_model):
super().__init__()
self.movie_model: tf.keras.Model = movie_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer = task
def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
# We pick out the user features and pass them into the user model.
user_embeddings = self.user_model(features["user_id"])
# And pick out the movie features and pass them into the movie model,
# getting embeddings back.
positive_movie_embeddings = self.movie_model(features["movie_title"])
# The task computes the loss and the metrics.
return self.task(user_embeddings, positive_movie_embeddings)
模型训练和评估
现在,我们在分布策略范围内实例化和编译模型
with strategy.scope():
model = MovielensModel(user_model, movie_model)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.1))
训练集、测试集
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()
模型训练
model.fit(cached_train, epochs=3)
# 模型评估
model.evaluate(cached_test, return_dict=True)