Spark实现朴素贝叶斯
关于贝叶斯的介绍在之前的文章中也有说明,网上也有许多资料,在这里就不在做过多赘述。
朴素贝叶斯模型
假设我们有数据样本如下:
(X1,X2,..Xn,Y)
有m个样本,每个样本有n个特征,特征输出有K个类别
我们可以通过以上样本学习得出先验概率:
P(Y=ck),k=1,2...k
然后学习得出条件概率:
P(X=x∣Y=ck)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)∣Y=ck))
朴素贝叶斯之所以朴素是因为对条件概率分布做了条件独立性假设(即用于分类的特征在类确定的条件下都是独立的),具体表现为:
P(X=x∣Y=ck)=P(X(1)=x(1),X(2)=x(2),X(n)=x(n)∣Y=ck))=i=1∏NP(X(i)=x(i)∣Y=ck)
朴素贝叶斯的这一假设大大简化了条件分布的计算,但是也牺牲了分类的准确性,因此,在特征之间非常不独立的情况下,可以优先考虑其他分类算法。
在给定的特征向量X=x的情况下,通过学习得到后验概率分布:
P(Y=ck∣X=x)
根据贝叶斯定量整理得出:
P(Y=ck∣X=x)=∑k=1NP(Y=ck)∏i=1NP(X(i)=x(i)∣Y=ck)∏i=1NP(X(i)=x(i)∣Y=ck)P(Y=ck)
将后验概率最大的类最X=x的输出。
以上过程是朴素贝叶斯的极大似然估计方法,在实际计算中,为了防止计算出现概率值为0的情况,我们在计算先验概率和条件概率时加入一个拉普拉斯平滑指数 λ(λ>=0)。(贝叶斯估计)
先验概率的贝叶斯估计:
Pλ(Y=ck)=N+kλ∑k=1NI(yi=ck)+λ
条件概率的贝叶斯估计:
Pλ(X(j)=ajl∣Y=ck)=∑k=1NI(yi=ck)+Sjλ∑k=1NI((X(j)=ajl,yi=ck)+λ
跟多解释和证明请参考李航老师的《统计学习方法》
数据说明
本案例数据来自于MNIST数据库,是一份手写数字数据库。
实现代码
// 数据加载
val mnistTrain = spark.read
.option("header", true)
.option("inferSchema", true)
.csv("F:\\DataSource\\data\\mnist\\train.csv")
// Schema信息
val ftSchemas = mnistTrain.schema.map(_.name).filterNot(_ == "label")
// 拉普拉斯平滑指数
val lamada = 1.0
// 数据格式变换
val flattenDF = mnistTrain
.flatMap(row => {
val label = row.getInt(0)
(1 until row.length).map(i => {
(label, ftSchemas(i - 1), row.getInt(i))
})
})
.toDF("label", "ftsName", "ftsValue")
// 分组计算每个分类的各个特征出现的频次
val grouped = flattenDF
.groupBy($"label", $"ftsName", $"ftsValue")
.agg(count($"ftsValue") as "ftsFreq")
.persist()
//计算每个特征的数量
val ftsLevels = grouped
.groupBy($"ftsName")
.agg(countDistinct($"ftsValue") as "ftsLevels")
// 计算每类的数量
val labelLevels = grouped
.where($"ftsName" === ftSchemas.head)
.groupBy($"label")
.agg(sum($"ftsFreq") as "ftsCounts")
grouped.unpersist()
//分类数
val numLabels: Double = labelLevels.count().toDouble
//样本量
val numSample: Double = labelLevels.rdd
.map(_.getLong(1).toDouble).collect().sum
// 拉普拉斯变换
val lpsamples: Double = numSample + lamada * numLabels
// 计算先验概率和每类各个特征的拉普拉斯变换后的数量
val pprobAndlp = labelLevels
.crossJoin(ftsLevels)
.withColumn("pprob", log(($"ftsCounts" + lamada) / lpsamples))
.withColumn("lp", $"ftsCounts" + $"ftsLevels" * lamada)
.drop("ftsCounts", "ftsLevels")
// 取对数后的先验概率
val pprobDF = pprobAndlp.select($"label", $"pprob").distinct()
val ftsLevelLpsDF = pprobAndlp.select($"label", $"ftsName", $"lp")
// 计算条件概率
val cprobDF = grouped
.join(ftsLevels, "ftsName")
.join(labelLevels, "label")
.select( $"label", $"ftsName", $"ftsValue",
($"ftsFreq" + lamada) / ($"ftsLevels" + $"ftsCounts") as "cprob" )
// 条件概率格式转换
val cprob: RDD[(String, (Map[Int, Double], Int))] = cprobDF.rdd
.map(row => {
val label = row.getInt(0)
val ftsName = row.getString(1)
val ftsValue = row.getInt(2)
val logcProb = row.getDouble(3)
((label, ftsName), Map(ftsValue -> logcProb))
})
.reduceByKey(_ ++ _)
.map {
case ((label, features), cprobs) =>
(features, (cprobs, label))
}
// ########################################预测####################################### //
// 将条件概率的数据广播
val cprobBroad = sc.broadcast(cprob.collect())
// 先验概率
val priorProbability = pprobDF
.map(row => {
row.getAs[Int]("label") -> row.getAs[Double]("pprob")
}).collect() .toMap
// 每一类的各个特征个数
val ftsLevelLps: Map[(Int, String), Double] = ftsLevelLpsDF
.map(row => {
val lb = row.getAs[Int]("label")
val ftsName = row.getAs[String]("ftsName")
val lp = row.getAs[Double]("lp")
(lb, ftsName) -> lp
}).collect() .toMap
val predict = mnistTrain.rdd.map(row => {
// (label, prob)
val labelAndProb: (Int, Double) = (1 until row.length)
.map(i => {
val observations = row.getInt(i) //第i个特征的观测值
val cpCompute: Array[(Int, Double)] = cprobBroad.value
.filter(_._1 == ftSchemas(i - 1))
.map(tps => {
// 特征i的条件概率Map
val cpMap: Map[Int, Double] = tps._2._1
// 拉普拉斯平滑之后的条件概率,防止概率为0的计算情况
val missFtscProb: Double = lamada / ftsLevelLps
.getOrElse((tps._2._2, tps._1), lamada)
//观测值的条件概率
val maybeDouble = cpMap.get(observations)
val cp: Double = maybeDouble match {
case None => missFtscProb
case _ => maybeDouble.head
}
(tps._2._2, math.log(cp))
})
cpCompute
})
.flatMap(_.toSeq)
.groupBy(_._1)
.mapValues(_.map(_._2).sum)
.map(tp => {
//在对概率取对数之后,由原来的连乘变为连加
tp._1 -> (priorProbability.getOrElse(tp._1, 0.0) + tp._2)
})
.maxBy(_._2) // 选择对数后验概率最大的类
Row.merge(row, Row.fromTuple(labelAndProb))
})
// 预测结果数据的Schema信息
val newSchema = mnistTrain.schema
.add("predict", IntegerType)
.add("lprob", DoubleType)
// 将预测结果转化为DataFrame
val predictdf = spark
.createDataFrame(predict, newSchema)
.withColumn("label", $"label".cast(DoubleType)) //在计算正确率、召回率等指标的时候需要字段为double类型
.withColumn("predict", $"predict".cast(DoubleType))
// 预测结果展示,因为数据量比较大,特征数多,计算的时候概率连乘结果会很小,由于计算语言或计算机精度的问题,对于特别小的数值会输出为0,所以计算结果的概率都以对数形式给出。
predictdf
.select($"label", $"predict", $"lprob")
.show(truncate = false)
结果查看:从训练的结果看,对于分类的准确率还是比较高的
+-----+-------+-------------------+
|label|predict|lprob |
+-----+-------+-------------------+
|0.0 |0.0 |-1095.5671559582588|
|1.0 |1.0 |-556.7921217594796 |
|2.0 |2.0 |-1014.0486603224243|
|8.0 |8.0 |-1036.2092625180194|
|6.0 |6.0 |-922.0631178117894 |
|8.0 |8.0 |-1091.188014061134 |
|1.0 |1.0 |-728.4240686145088 |
|1.0 |1.0 |-564.6324122327153 |
|7.0 |7.0 |-795.1048199158151 |
|1.0 |1.0 |-628.7862373816038 |
朴素贝叶斯的其他形式
本案例中实现的适用于多项式分布的情况,即数据的特征由一个多项式分布生成;另外还有伯努利朴素贝叶斯和高斯朴素贝叶斯。
1.伯努利朴素贝叶斯,假设特征分别为二元伯努利分布:
P(Xj=xjl)=P(j∣Y=ck)xjl+(1−P(j∣Y=ck)(1−xjl)
其中 Xjl只能取值0和1
2.高斯朴素贝叶斯,假设特征都服从简单的正态分布:
P(X=xj∣Y=ck)=2πδk21exp(−2δk2(xj−uk)2)
由于作者水平有限,在介绍及实现过程中难免有纰漏之处,欢迎细心的朋友指正
参考资料:
《统计学习方法》-- 李航