一、MLlib简介
MLlib是一些常用的机器学习算法和库在Spark平台上的实现。MLlib是AMPLab的在研机器学习项目MLBase的底层组件。MLBase是一个机器学习平台,MLI是一个接口层,提供很多结构,MLlib是底层算法实现层。
MLlib中包含分类与回归、聚类、协同过滤、数据降维组件以及底层的优化库。
MLlib底层使用到了Scala书写的线性代数库Breeze,Breeze底层依赖netlib-java库。netlib-java底层依赖原生的Fortran routines。
二、协同过滤推荐算法
协同过滤算法(collaborative filtering,CF)是推荐系统中应用最广泛的推荐算法之一。协同过滤的实质,就是通过预测用户-物品矩阵中缺失的评分,来预测用户对物品的偏好。更加具体地,协同过滤算法主要分为memory-based CF和Model-based CF,而memory-based CF包括User-based CF和Item-based CF。
矩阵分解
常用的协同过滤矩阵分解算法包括如下3种:
- 奇异值分解(Singular Value Decomposition,SVD)SVD将用户-项目评分矩阵R直接分解为用户特征矩阵U、奇异值矩阵Σ和物品特征矩阵V的乘积。若采用奇异值分解,需要首先填充用户-项目评分矩阵。显然,这样会造成了两个问题:其一,填充大大增加了数据量,增加了算法复杂度;其二,简单粗暴的数据填充很容易造成数据失真。如果不填充评分矩阵,将空值设置为0,那么奇异值分解又会造成过学习的问题。
- 正则化矩阵分解(Regularized Matrix Factorization)因为矩阵分解的常用评价指标是均方根误差(Root Mean Squared Error,RMSE),那么可以直接最小化RMSE学习用户特征矩阵U和物品特征V,并加入一个正则化项来避免过拟合。
- 带偏置的矩阵分解(Biased Matrix Factorization)于宽容的用户来说,它的评分行为普遍偏高,而对挑剔的用户来说,他的评分记录普遍偏低,即使他们对同一物品的评分相同,但是他们对该物品的喜好程度却并不一样。把这些独立于用户或独立于物品的因素称为偏置(Bias)部分,将用户和物品的交互即用户对物品的喜好部分称为个性化部分。
三、MLlib的推荐算法工具
MLlib是Spark中用于机器学习的强大工具包。协同过滤推荐是MLlib提供的核心功能之一,在Spark的内置包org.apache.spark.mllib.recommendation是集成了推荐算法的常用工具。org.apache.spark.mllib.recommendation中提供了3个用于协同过滤推荐的数据类型。
- Rating:Rating对象是一个用户、项目和评分的三元组。
- ALS:ALS提供了求解带偏置矩阵分解的交替最小二乘算法(Alternating LeastSquares,ALS)。
- MatrixFactorizationModel:ALS求解矩阵分解返回的结果类型。
四、数据集
使用movieLen数据集,具体描述参考之前博客:https://blog.csdn.net/tonydz0523/article/details/120624474?spm=1001.2014.3001.5501
五、通过协同过滤实现电影推荐
使用ALS算法求解矩阵分解时,需要设定3个参数:矩阵分解的秩rank、正则系数alpha和迭代次数numIters。为了确定最佳的模型参数,将数据集划分为3个部分:训练集、验证集和测试集。训练集是用来训练多个协同过滤模型,验证集从中选择出均方误差最小的模型,测试集用来验证最佳模型的预测准确率。
矩阵分解的秩从812中选择,正则系数从1.010.0中选择,迭代次数从10~20中选择
/**
* @author: ffzs
* @Date: 2021/10/14 上午10:12
*/
object MovieLenML {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// sparkSession
val spark = SparkSession.builder
.appName("StructuredStreamingExample")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val dataPath = "/home/ffzs/data/ml-1m"
val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
val movieRdd = sc.textFile(f"$dataPath/movies.dat")
val rating = ratingsRdd.map(_.split("::"))
.map(it => (it(3).toLong % 10, Rating(it(0).toInt, it(1).toInt, it(2).toLong)))
val movies = movieRdd.map(_.split("::"))
.map(it => (it(0).toInt, it(1)))
.collect().toMap
// 切分训练集和测试集
val training = rating
.filter(x => x._1 < 6)
.values
.repartition(4)
.cache()
// 切分验证集
val validation = rating.filter(x => x._1>=6 && x._1<8)
.values
.repartition(4)
.cache()
// 切分测试集
val test = rating.filter(_._1 >= 8).values.cache()
val ranks = List(8, 12)
val lambdas = List(1.0, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = rmse(model, validation)
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
val testRmse = rmse(bestModel.get, test)
println(f"当rank:$bestRank,lambda:$bestLambda,numIter:$bestNumIter 获取最小的rmse,模型为最优模型," +
f"此时测试集的rmse为:$testRmse%.2f")
// 推荐
val userId = 10;
val thePersonMovieIds = ratingsRdd.map(_.split("::"))
.filter(_(0).toInt == userId)
.map(_(1).toInt).collect().toSet
val cands = sc.parallelize(movies.keys.filter(!thePersonMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get.predict(cands.map(it => (userId, it))).collect()
.sortBy(- _.rating)
.take(10)
var i = 1
recommendations.foreach { it =>
println(f"$i%d : ${movies(it.product)}")
i += 1
}
// 你也可以通过自带的recommendProducts对用户进行商品的推荐
i = 1
bestModel.get.recommendProducts(userId, 10).foreach { it =>
println(f"$i%d : ${movies(it.product)}")
i += 1
}
}
def rmse(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
val pred: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predAndRating = pred.map{x => ((x.user, x.product), x.rating)}
.join(data.map(it => ((it.user, it.product), it.rating))).values
math.sqrt(predAndRating.map(x => math.pow((x._1 - x._2), 2)).mean())
}
}
输出结果: