Spark实现朴素贝叶斯

Spark实现朴素贝叶斯

关于贝叶斯的介绍在之前的文章中也有说明,网上也有许多资料,在这里就不在做过多赘述。

朴素贝叶斯模型

假设我们有数据样本如下:
(X1,X2,..Xn,Y) (X_1,X_2,..X_n,Y)(X1​,X2​,..Xn​,Y)
有m个样本,每个样本有n个特征,特征输出有K个类别
我们可以通过以上样本学习得出先验概率
P(Y=ck),k=1,2...k P(Y=c_k),k=1,2...kP(Y=ck​),k=1,2...k
然后学习得出条件概率
P(X=xY=ck)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)Y=ck)) P(X=x|Y=c_k) =P(X^{(1)}=x^{(1)}, X^{(2)}=x^{(2)},X^{(n)}=x^{(n)}|Y=c_k))P(X=x∣Y=ck​)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)∣Y=ck​))
朴素贝叶斯之所以朴素是因为对条件概率分布做了条件独立性假设(即用于分类的特征在类确定的条件下都是独立的),具体表现为:
P(X=xY=ck)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)Y=ck))=i=1NP(X(i)=x(i)Y=ck) P(X=x|Y=c_k) =P(X^{(1)}=x^{(1)}, X^{(2)}=x^{(2)},X^{(n)}=x^{(n)}|Y=c_k))= \prod_{i=1}^N P(X^{(i)}=x^{(i)}|Y=c_k)P(X=x∣Y=ck​)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)∣Y=ck​))=i=1∏N​P(X(i)=x(i)∣Y=ck​)
朴素贝叶斯的这一假设大大简化了条件分布的计算,但是也牺牲了分类的准确性,因此,在特征之间非常不独立的情况下,可以优先考虑其他分类算法。
在给定的特征向量X=x的情况下,通过学习得到后验概率分布:
P(Y=ckX=x)P(Y=c_k|X=x)P(Y=ck​∣X=x)
根据贝叶斯定量整理得出:
P(Y=ckX=x)=i=1NP(X(i)=x(i)Y=ck)P(Y=ck)k=1NP(Y=ck)i=1NP(X(i)=x(i)Y=ck)P(Y=c_k|X=x) =\frac{ \prod_{i=1}^N P(X^{(i)}=x^{(i)}|Y=c_k)P(Y=c_k)}{\sum_{k=1}^N P(Y=c_k) \prod_{i=1}^N P(X^{(i)}=x^{(i)}|Y=c_k)} P(Y=ck​∣X=x)=∑k=1N​P(Y=ck​)∏i=1N​P(X(i)=x(i)∣Y=ck​)∏i=1N​P(X(i)=x(i)∣Y=ck​)P(Y=ck​)​
将后验概率最大的类最X=x的输出。

以上过程是朴素贝叶斯的极大似然估计方法,在实际计算中,为了防止计算出现概率值为0的情况,我们在计算先验概率和条件概率时加入一个拉普拉斯平滑指数 λ(λ>=0)。(贝叶斯估计)
先验概率的贝叶斯估计:
Pλ(Y=ck)=k=1NI(yi=ck)+λN+kλP_λ(Y=c_k) = \frac{\sum_{k=1}^N I(y_i=c_k)+λ}{N+kλ} Pλ​(Y=ck​)=N+kλ∑k=1N​I(yi​=ck​)+λ​
条件概率的贝叶斯估计:
Pλ(X(j)=ajlY=ck)=k=1NI((X(j)=ajl,yi=ck)+λk=1NI(yi=ck)+Sjλ P_λ(X^{(j)}=a_{jl} |Y=c_k) =\frac{\sum_{k=1}^N I((X^{(j)}=a_{jl} ,y_i=c_k)+λ}{\sum_{k=1}^N I(y_i=c_k)+S_jλ} Pλ​(X(j)=ajl​∣Y=ck​)=∑k=1N​I(yi​=ck​)+Sj​λ∑k=1N​I((X(j)=ajl​,yi​=ck​)+λ​

跟多解释和证明请参考李航老师的《统计学习方法》

数据说明

本案例数据来自于MNIST数据库,是一份手写数字数据库。

实现代码

   // 数据加载
    val mnistTrain = spark.read
      .option("header", true)
      .option("inferSchema", true)
      .csv("F:\\DataSource\\data\\mnist\\train.csv")

    // Schema信息
    val ftSchemas = mnistTrain.schema.map(_.name).filterNot(_ == "label")

    // 拉普拉斯平滑指数
    val lamada = 1.0

    // 数据格式变换  
    val flattenDF = mnistTrain
      .flatMap(row => {
        val label = row.getInt(0)
        (1 until row.length).map(i => {
          (label, ftSchemas(i - 1), row.getInt(i))
        })
      })
      .toDF("label", "ftsName", "ftsValue")


    // 分组计算每个分类的各个特征出现的频次
    val grouped = flattenDF
      .groupBy($"label", $"ftsName", $"ftsValue")
      .agg(count($"ftsValue") as "ftsFreq")
      .persist()
 
    //计算每个特征的数量
    val ftsLevels = grouped
      .groupBy($"ftsName")
      .agg(countDistinct($"ftsValue") as "ftsLevels")
 
    // 计算每类的数量
    val labelLevels = grouped
      .where($"ftsName" === ftSchemas.head)
      .groupBy($"label")
      .agg(sum($"ftsFreq") as "ftsCounts")
 
    grouped.unpersist()

    //分类数 
    val numLabels: Double = labelLevels.count().toDouble

    //样本量
    val numSample: Double = labelLevels.rdd
      .map(_.getLong(1).toDouble).collect().sum

    // 拉普拉斯变换
    val lpsamples: Double = numSample + lamada * numLabels

    // 计算先验概率和每类各个特征的拉普拉斯变换后的数量
    val pprobAndlp = labelLevels
      .crossJoin(ftsLevels)
      .withColumn("pprob", log(($"ftsCounts" + lamada) / lpsamples))
      .withColumn("lp", $"ftsCounts" + $"ftsLevels" * lamada)
      .drop("ftsCounts", "ftsLevels")

    // 取对数后的先验概率
    val pprobDF = pprobAndlp.select($"label", $"pprob").distinct()

    val ftsLevelLpsDF = pprobAndlp.select($"label", $"ftsName", $"lp")
 
    // 计算条件概率
    val cprobDF = grouped
      .join(ftsLevels, "ftsName")
      .join(labelLevels, "label")
      .select(  $"label",  $"ftsName", $"ftsValue",
       ($"ftsFreq" + lamada) / ($"ftsLevels" + $"ftsCounts") as "cprob"  )
 
   // 条件概率格式转换
    val cprob: RDD[(String, (Map[Int, Double], Int))] = cprobDF.rdd
      .map(row => {
        val label = row.getInt(0)
        val ftsName = row.getString(1)
        val ftsValue = row.getInt(2)
        val logcProb = row.getDouble(3)
        ((label, ftsName), Map(ftsValue -> logcProb))
      })
      .reduceByKey(_ ++ _)
      .map {
        case ((label, features), cprobs) =>
          (features, (cprobs, label))
      }

    // ########################################预测####################################### //
    
    
    // 将条件概率的数据广播
    val cprobBroad = sc.broadcast(cprob.collect())

    // 先验概率
    val priorProbability = pprobDF
      .map(row => {
        row.getAs[Int]("label") -> row.getAs[Double]("pprob")
      }).collect() .toMap
   
   // 每一类的各个特征个数
    val ftsLevelLps: Map[(Int, String), Double] = ftsLevelLpsDF
      .map(row => {
        val lb = row.getAs[Int]("label")
        val ftsName = row.getAs[String]("ftsName")
        val lp = row.getAs[Double]("lp")
        (lb, ftsName) -> lp
      }).collect() .toMap

    val predict = mnistTrain.rdd.map(row => {
      //  (label, prob)
      val labelAndProb: (Int, Double) = (1 until row.length)
        .map(i => {
          val observations = row.getInt(i) //第i个特征的观测值
          val cpCompute: Array[(Int, Double)] = cprobBroad.value
            .filter(_._1 == ftSchemas(i - 1))
            .map(tps => {
              // 特征i的条件概率Map
              val cpMap: Map[Int, Double] = tps._2._1
              // 拉普拉斯平滑之后的条件概率,防止概率为0的计算情况
              val missFtscProb: Double = lamada / ftsLevelLps
                .getOrElse((tps._2._2, tps._1), lamada)
              //观测值的条件概率
              val maybeDouble = cpMap.get(observations)
              val cp: Double = maybeDouble match {
                case None => missFtscProb
                case _    => maybeDouble.head
              }
              (tps._2._2, math.log(cp)) 
            })
            
          cpCompute
        })
        .flatMap(_.toSeq)
        .groupBy(_._1)
        .mapValues(_.map(_._2).sum)
        .map(tp => {
         //在对概率取对数之后,由原来的连乘变为连加 
          tp._1 -> (priorProbability.getOrElse(tp._1, 0.0) + tp._2) 
        })
        .maxBy(_._2)  // 选择对数后验概率最大的类
      Row.merge(row, Row.fromTuple(labelAndProb))
    })

   // 预测结果数据的Schema信息
    val newSchema = mnistTrain.schema
      .add("predict", IntegerType)
      .add("lprob", DoubleType)

    // 将预测结果转化为DataFrame
    val predictdf = spark
      .createDataFrame(predict, newSchema)
      .withColumn("label", $"label".cast(DoubleType))  //在计算正确率、召回率等指标的时候需要字段为double类型
      .withColumn("predict", $"predict".cast(DoubleType))
 
   // 预测结果展示,因为数据量比较大,特征数多,计算的时候概率连乘结果会很小,由于计算语言或计算机精度的问题,对于特别小的数值会输出为0,所以计算结果的概率都以对数形式给出。
    predictdf
      .select($"label", $"predict", $"lprob")
      .show(truncate = false)


结果查看:从训练的结果看,对于分类的准确率还是比较高的

+-----+-------+-------------------+
|label|predict|lprob              |
+-----+-------+-------------------+
|0.0  |0.0    |-1095.5671559582588|
|1.0  |1.0    |-556.7921217594796 |
|2.0  |2.0    |-1014.0486603224243|
|8.0  |8.0    |-1036.2092625180194|
|6.0  |6.0    |-922.0631178117894 |
|8.0  |8.0    |-1091.188014061134 |
|1.0  |1.0    |-728.4240686145088 |
|1.0  |1.0    |-564.6324122327153 |
|7.0  |7.0    |-795.1048199158151 |
|1.0  |1.0    |-628.7862373816038 |

朴素贝叶斯的其他形式

本案例中实现的适用于多项式分布的情况,即数据的特征由一个多项式分布生成;另外还有伯努利朴素贝叶斯和高斯朴素贝叶斯。
1.伯努利朴素贝叶斯,假设特征分别为二元伯努利分布:
P(Xj=xjl)=P(jY=ck)xjl+(1P(jY=ck)(1xjl)P(X_j=x_{jl})=P(j|Y=c_k)x_{jl} + (1-P(j|Y=c_k)(1-x_{jl}) P(Xj​=xjl​)=P(j∣Y=ck​)xjl​+(1−P(j∣Y=ck​)(1−xjl​)
其中 Xjl只能取值0和1

2.高斯朴素贝叶斯,假设特征都服从简单的正态分布:

P(X=xjY=ck)=12πδk2exp((xjuk)22δk2)P(X=x_j|Y=c_k) = \frac{1}{\sqrt{2\pi\boldsymbol{\delta^{2}_k}}}exp(- \frac{(x_j-u_k)^{2}}{2\boldsymbol{\delta^{2}_k}})P(X=xj​∣Y=ck​)=2πδk2​​1​exp(−2δk2​(xj​−uk​)2​)

由于作者水平有限,在介绍及实现过程中难免有纰漏之处,欢迎细心的朋友指正

参考资料:

《统计学习方法》-- 李航

https://www.cnblogs.com/pinard/p/6069267.html

上一篇:AVR单片机熔丝位问题


下一篇:IDL