SparkMLlib的线性回归和逻辑回归

分类与回归

问题类型 解决方法
二元分类 线性SVM,逻辑回归,决策树,随机森林,梯度提升树,朴素贝叶斯
多元分类 逻辑回归,决策树,随机森林,朴素贝叶斯
回归问题 线性最小二乘法,套索,岭回归,决策树,随机森林,梯度提升树,等渗回归

线性方法

线性向量机(SVM)

在Spark MLlib中线性SVM使用的是L2正则化,当然也支持L1正则化。线性SVM是大规模分类任务的一个标准方法。下面的代码片段说明了如何加载一个样本数据集,使用算法对象中的静态方法在该训练数据上执行训练算法,并使用所产生的模型进行预测以计算训练误差。

package com.test.SVMsTest

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD


object Main {
  def main(args: Array[String]): Unit = {
    //TODO 创建环境
    val conf: SparkConf = new SparkConf().setAppName("SVMWithSGDExample").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //TODO 数据操作
    val dataPath = "/Volumes/Data/All_code/Java_code/Data/data/mllib/sample_libsvm_data.txt"
    val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, dataPath)

    val splits: Array[RDD[LabeledPoint]] = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training: RDD[LabeledPoint] = splits(0).cache()
    val test: RDD[LabeledPoint] = splits(1)

    val numIterations = 100
    val model: SVMModel = SVMWithSGD.train(training, numIterations)

    model.clearThreshold()
    val scoreAndLabels: RDD[(Double, Double)] = test.map { point => {
      val score: Double = model.predict(point.features)
      (score, point.label)
    }
    }

    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    val auROC: Double = metrics.areaUnderROC()

    print(s"Area under ROC = $auROC")

    //存储模型
    model.save(sc, "target/scalaModel")
    //加载模型
    val sameModel: SVMModel = SVMModel.load(sc, "target/scalaModel")
    //TODO 关闭环境

    sc.stop()
  }
}

SVMWithSGD.train()方法默认执行L2正则化,正则化参数设置为1.0。如果想要配置这个算法,我们可以通过直接创建一个新对象并调用setter方法来进一步定制SVMWithSGD。下面的代码产生额一个L1正则化的SVMs变体.正则化参数设置为0.1,并设置了200次迭代的训练算法。

    val svmAlg = new SVMWithSGD()
    svmAlg.optimizer
      .setNumIterations(200)
      .setRegParam(0.1)
      .setUpdater(new L1Updater)
    val modelL1: SVMModel = svmAlg.run(training)

逻辑回归

逻辑回归被广泛用于预测二元分类。在Spark MLlib实现两种逻辑回归:小批量梯度下降法, L-BFGS。我们更加推荐L-BFGS,因为它收敛更快。下面的代码说明了如何加载一个多类数据集样本,将其分为训练集和测试集,并使用LogisticRegressionWithLBFGS来拟合一个逻辑回归模型。然后针对测试数据集对模型进行评估并保存到磁盘。


object Main {
  def main(args: Array[String]): Unit = {
    //TODO 创建环境
    val conf: SparkConf = new SparkConf().setAppName("appName").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //TODO 数据操作
    // 加载训练数据集
    val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "/Volumes/Data/All_code/Java_code/Data/data/mllib/sample_libsvm_data.txt")
    // 划分数据集
    val splits: Array[RDD[LabeledPoint]] = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training: RDD[LabeledPoint] = splits(0).cache()
    val test: RDD[LabeledPoint] = splits(1)

    //创建模型,训练模型
    val model = new LogisticRegressionWithLBFGS()
      .setNumClasses(10)    //分10类
      .run(training)

    //计算测试集
    val predictionAndLabels: RDD[(Double, Double)] = test.map(point => {
      val prediction: Double = model.predict(point.features)
      (prediction, point.label)
    })

    //获取评估指数
    val metrics = new MulticlassMetrics(predictionAndLabels)
    val acc: Double = metrics.accuracy
    println(s"Accuracy = $acc")

    //存储模型
    //model.save(sc, "target/Logistic")
    //TODO 关闭环境
    sc.stop()
  }
}

回归

线性最小二乘法

各种相关的回归方法是通过使用不同类型的正则化而得来的:最小二乘法或普通最小二乘法不使用正则化。岭回归使用L2正则化;拉索回归使用L1正则化。

拉索回归

岭回归

流式线性回归

当数据以流的方式达到时,在线拟合回归模型是有用的,当心的数据到达更新模型的参数。在线拟合和离线拟合类似,只是拟合发生在每一批数据上,这样的模型就会持续更新反应流数据中的规律。通过将权重初始化为0来创建我们的模型,并为训练和测试数据流,然后开始工作。将预测结果与真实标签一起打印出来,让我们很容易看到结果。

上一篇:Spark算子介绍和比较


下一篇:关于Spark默认并行度spark.default.parallelism的理解