spark进阶(十):使用MLlib进行协同过滤电影推荐

一、MLlib简介

MLlib是一些常用的机器学习算法和库在Spark平台上的实现。MLlib是AMPLab的在研机器学习项目MLBase的底层组件。MLBase是一个机器学习平台,MLI是一个接口层,提供很多结构,MLlib是底层算法实现层。

MLlib中包含分类与回归、聚类、协同过滤、数据降维组件以及底层的优化库。

MLlib底层使用到了Scala书写的线性代数库Breeze,Breeze底层依赖netlib-java库。netlib-java底层依赖原生的Fortran routines。

二、协同过滤推荐算法

协同过滤算法(collaborative filtering,CF)是推荐系统中应用最广泛的推荐算法之一。协同过滤的实质,就是通过预测用户-物品矩阵中缺失的评分,来预测用户对物品的偏好。更加具体地,协同过滤算法主要分为memory-based CF和Model-based CF,而memory-based CF包括User-based CF和Item-based CF。

矩阵分解

常用的协同过滤矩阵分解算法包括如下3种:

  • 奇异值分解(Singular Value Decomposition,SVD)SVD将用户-项目评分矩阵R直接分解为用户特征矩阵U、奇异值矩阵Σ和物品特征矩阵V的乘积。若采用奇异值分解,需要首先填充用户-项目评分矩阵。显然,这样会造成了两个问题:其一,填充大大增加了数据量,增加了算法复杂度;其二,简单粗暴的数据填充很容易造成数据失真。如果不填充评分矩阵,将空值设置为0,那么奇异值分解又会造成过学习的问题。
  • 正则化矩阵分解(Regularized Matrix Factorization)因为矩阵分解的常用评价指标是均方根误差(Root Mean Squared Error,RMSE),那么可以直接最小化RMSE学习用户特征矩阵U和物品特征V,并加入一个正则化项来避免过拟合。
  • 带偏置的矩阵分解(Biased Matrix Factorization)于宽容的用户来说,它的评分行为普遍偏高,而对挑剔的用户来说,他的评分记录普遍偏低,即使他们对同一物品的评分相同,但是他们对该物品的喜好程度却并不一样。把这些独立于用户或独立于物品的因素称为偏置(Bias)部分,将用户和物品的交互即用户对物品的喜好部分称为个性化部分。

三、MLlib的推荐算法工具

MLlib是Spark中用于机器学习的强大工具包。协同过滤推荐是MLlib提供的核心功能之一,在Spark的内置包org.apache.spark.mllib.recommendation是集成了推荐算法的常用工具。org.apache.spark.mllib.recommendation中提供了3个用于协同过滤推荐的数据类型。

  • Rating:Rating对象是一个用户、项目和评分的三元组。
  • ALS:ALS提供了求解带偏置矩阵分解的交替最小二乘算法(Alternating LeastSquares,ALS)。
  • MatrixFactorizationModel:ALS求解矩阵分解返回的结果类型。

四、数据集

使用movieLen数据集,具体描述参考之前博客:https://blog.csdn.net/tonydz0523/article/details/120624474?spm=1001.2014.3001.5501

五、通过协同过滤实现电影推荐

使用ALS算法求解矩阵分解时,需要设定3个参数:矩阵分解的秩rank、正则系数alpha和迭代次数numIters。为了确定最佳的模型参数,将数据集划分为3个部分:训练集、验证集和测试集。训练集是用来训练多个协同过滤模型,验证集从中选择出均方误差最小的模型,测试集用来验证最佳模型的预测准确率。

矩阵分解的秩从812中选择,正则系数从1.010.0中选择,迭代次数从10~20中选择

/**
 * @author: ffzs
 * @Date: 2021/10/14 上午10:12
 */
object MovieLenML {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)

    // sparkSession
    val spark = SparkSession.builder
      .appName("StructuredStreamingExample")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext

    val dataPath = "/home/ffzs/data/ml-1m"

    val ratingsRdd = sc.textFile(f"$dataPath/ratings.dat")
    val movieRdd = sc.textFile(f"$dataPath/movies.dat")

    val rating = ratingsRdd.map(_.split("::"))
      .map(it => (it(3).toLong % 10, Rating(it(0).toInt, it(1).toInt, it(2).toLong)))
    val movies = movieRdd.map(_.split("::"))
      .map(it => (it(0).toInt, it(1)))
      .collect().toMap

    // 切分训练集和测试集
    val training = rating
      .filter(x => x._1 < 6)
      .values
      .repartition(4)
      .cache()
    // 切分验证集
    val validation = rating.filter(x => x._1>=6 && x._1<8)
      .values
      .repartition(4)
      .cache()
    // 切分测试集
    val test = rating.filter(_._1 >= 8).values.cache()

    val ranks = List(8, 12)
    val lambdas = List(1.0, 10.0)
    val numIters = List(10, 20)
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = -1.0
    var bestNumIter = -1
    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
      val model = ALS.train(training, rank, numIter, lambda)
      val validationRmse = rmse(model, validation)
      if (validationRmse < bestValidationRmse) {
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }
    val testRmse = rmse(bestModel.get, test)
    println(f"当rank:$bestRank,lambda:$bestLambda,numIter:$bestNumIter 获取最小的rmse,模型为最优模型," +
      f"此时测试集的rmse为:$testRmse%.2f")

    // 推荐
    val userId = 10;
    val thePersonMovieIds = ratingsRdd.map(_.split("::"))
      .filter(_(0).toInt == userId)
      .map(_(1).toInt).collect().toSet

    val cands = sc.parallelize(movies.keys.filter(!thePersonMovieIds.contains(_)).toSeq)
    val recommendations = bestModel.get.predict(cands.map(it => (userId, it))).collect()
      .sortBy(- _.rating)
      .take(10)

    var i = 1
    recommendations.foreach { it =>
      println(f"$i%d : ${movies(it.product)}")
      i += 1
    }

    // 你也可以通过自带的recommendProducts对用户进行商品的推荐
    i = 1
    bestModel.get.recommendProducts(userId, 10).foreach { it =>
      println(f"$i%d : ${movies(it.product)}")
      i += 1
    }
  }

  def rmse(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    val pred: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))

    val predAndRating = pred.map{x => ((x.user, x.product), x.rating)}
      .join(data.map(it => ((it.user, it.product), it.rating))).values

    math.sqrt(predAndRating.map(x => math.pow((x._1 - x._2), 2)).mean())
  }
}

输出结果:

spark进阶(十):使用MLlib进行协同过滤电影推荐

上一篇:如何利用 notedown 完成 ipynb与markdown之间的格式转换?


下一篇:1.Spark ML学习笔记—Spark MLlib 与 Spark ML、Pipelines 的主要概念