Python+Spark2.0+hadoop学习笔记——Python Spark MLlib逻辑斯蒂回归二分类

同上一部分的内容,在进行二分类问题时,逻辑斯蒂回归也是一种常用的分类方法。逻辑斯蒂回归使用了一个Sigmoid函数来作为核心的内容来实现分类的思想,接下里介绍在Pyspark中使用MLlib来实现逻辑斯蒂回归。

第一步:导入需要的库函数

import sys
from time import time
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.feature import StandardScaler

第二步:进行数据准备(和之前树模型不同的是,在使用逻辑斯蒂回归的时候因为输入数据的数值跨度特别大,因此需要进行数据标准化操作,使用函数StandardScaler()来实现)

def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

def extract_label(record):
label=(record[-1])
return float(label)

def extract_features(field,categoriesMap,featureEnd):
categoryIdx = categoriesMap[field[3]]
categoryFeatures = np.zeros(len(categoriesMap))
categoryFeatures[categoryIdx] = 1
numericalFeatures=[convert_float(field) for field in field[4: featureEnd]]
return np.concatenate(( categoryFeatures, numericalFeatures))

def convert_float(x):
return (0 if x=="?" else float(x))

def PrepareData(sc):
print("Data loading...")
rawDataWithHeader = sc.textFile(Path+"data/train.tsv")
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rData=rawData.map(lambda x: x.replace("\"", ""))
lines = rData.map(lambda x: x.split("\t"))
print("The number of data:" + str(lines.count()))
print("Before normalization:")
categoriesMap = lines.map(lambda fields: fields[3]). \
distinct().zipWithIndex().collectAsMap()
labelRDD = lines.map(lambda r: extract_label(r))
featureRDD = lines.map(lambda r: extract_features(r,categoriesMap,len(r) - 1))
for i in featureRDD.first():
print (str(i)+","),
print("")
print("After normalization:")
stdScaler = StandardScaler(withMean=True, withStd=True).fit(featureRDD)
ScalerFeatureRDD=stdScaler.transform(featureRDD)
for i in ScalerFeatureRDD.first():
print (str(i)+","),
labelpoint=labelRDD.zip(ScalerFeatureRDD)
labelpointRDD=labelpoint.map(lambda r: LabeledPoint(r[0], r[1]))
(trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1])
print("trainData:" + str(trainData.count()) +
"validationData:" + str(validationData.count()) +
"testData:" + str(testData.count()))
return (trainData, validationData, testData, categoriesMap)

第三步:对模型进行训练

def PredictData(sc,model,categoriesMap):
print("Data loading...")
rawDataWithHeader = sc.textFile(Path+"data/test.tsv")
header = rawDataWithHeader.first()
rawData = rawDataWithHeader.filter(lambda x:x !=header)
rData=rawData.map(lambda x: x.replace("\"", ""))
lines = rData.map(lambda x: x.split("\t"))
print("The number of data" + str(lines.count()))
dataRDD = lines.map(lambda r: ( r[0] ,
extract_features(r,categoriesMap,len(r) )))
DescDict = {
0: "ephemeral",
1: "evergreen"
}
for data in dataRDD.take(10):
predictResult = model.predict(data[1])
print( "Web" +str(data[0])+"\n" +\
" Prediction:"+ str(predictResult)+ \
" Inllustration:"+DescDict[predictResult] +"\n")

第四步:对模型进行评估(需要调节的参数有numIterations、stepSize和miniBatchFraction)

def evaluateModel(model, validationData):
score = model.predict(validationData.map(lambda p: p.features))
score = score.map(lambda score : float(score))
Labels = validationData.map(lambda p: p.label)
Labels = Labels.map(lambda Labels : float(Labels))
scoreAndLabels=score.zip(Labels)
metrics = BinaryClassificationMetrics(scoreAndLabels)
AUC=metrics.areaUnderROC
return(AUC)

def trainEvaluateModel(trainData,validationData,
numIterations, stepSize, miniBatchFraction):
startTime = time()
model = LogisticRegressionWithSGD.train(trainData,numIterations, stepSize, miniBatchFraction)
AUC = evaluateModel(model, validationData)
duration = time() - startTime
print ( " numIterations="+str(numIterations) +\
" stepSize="+str(stepSize) + \
" miniBatchFraction="+str(miniBatchFraction) +\
" Time="+str(duration) + \
" AUC = " + str(AUC) )
return (AUC,duration, numIterations, stepSize, miniBatchFraction,model)

def evalParameter(trainData, validationData, evalparm,
numIterationsList, stepSizeList, miniBatchFractionList):
metrics = [trainEvaluateModel(trainData, validationData,
numIterations,stepSize,miniBatchFraction )
for numIterations in numIterationsList
for stepSize in stepSizeList
for miniBatchFraction in miniBatchFractionList ]
if evalparm=="numIterations":
IndexList=numIterationsList[:]
elif evalparm=="stepSize":
IndexList=stepSizeList[:]
elif evalparm=="miniBatchFraction":
IndexList=miniBatchFractionList[:]
df = pd.DataFrame(metrics,index=IndexList,
columns=['AUC', 'duration','numIterations', 'stepSize', 'miniBatchFraction','model'])
showchart(df,evalparm,'AUC','duration',0.5,0.7 )

def showchart(df,evalparm ,barData,lineData,yMin,yMax):
ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12)
ax.set_xlabel(evalparm,fontsize=12)
ax.set_ylim([yMin,yMax])
ax.set_ylabel(barData,fontsize=12)
ax2 = ax.twinx()
ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r')
plt.show()

def evalAllParameter(trainData, validationData,
numIterationsList, stepSizeList, miniBatchFractionList):
metrics = [trainEvaluateModel(trainData, validationData,
numIterations,stepSize, miniBatchFraction )
for numIterations in numIterationsList
for stepSize in stepSizeList
for miniBatchFraction in miniBatchFractionList ]
Smetrics = sorted(metrics, key=lambda k: k[0], reverse=True)
bestParameter=Smetrics[0]
print("Best parameter:numIterations:" + str(bestParameter[2]) +
" ,stepSize:" + str(bestParameter[3]) +
" ,miniBatchFraction:" + str(bestParameter[4]) +
" ,AUC = " + str(bestParameter[0]))
return bestParameter[5]

def parametersEval(trainData, validationData):
print("numIterations")
evalParameter(trainData, validationData,"numIterations",
numIterationsList=[5, 15, 20, 60, 100],
stepSizeList=[10],
miniBatchFractionList=[1 ])
print("stepSize")
evalParameter(trainData, validationData,"stepSize",
numIterationsList=[100],
stepSizeList=[10, 50, 100, 200],
miniBatchFractionList=[1])
print("miniBatchFraction")
evalParameter(trainData, validationData,"miniBatchFraction",
numIterationsList=[100],
stepSizeList =[100],
miniBatchFractionList=[0.5, 0.8, 1 ])

第五步:Spark相关设置

def SetLogger( sc ):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

def SetPath(sc):
global Path
if sc.master[0:5]=="local" :
Path="file:/home/jorlinlee/pythonsparkexample/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"

def CreateSparkContext():
conf = SparkConf().setMaster("local[*]").setAppName("LR")
sc = SparkContext(conf = conf)
print ("master="+sc.master)
SetLogger(sc)
SetPath(sc)
return (sc)


sc.stop()

第六步:运行主程序

if __name__ == "__main__":
print("RunLogisticRegressionWithSGDBinary")
sc=CreateSparkContext()
print("Data preparing")
(trainData, validationData, testData, categoriesMap) =PrepareData(sc)
trainData.persist(); validationData.persist(); testData.persist()
print("Evaluating")
(AUC,duration, numIterationsParm, stepSizeParm, miniBatchFractionParm,model)=trainEvaluateModel(trainData, validationData, 15, 10, 0.5)
if (len(sys.argv) == 2) and (sys.argv[1]=="-e"):
parametersEval(trainData, validationData)
elif (len(sys.argv) == 2) and (sys.argv[1]=="-a"):
print("Best parameters")
model=evalAllParameter(trainData, validationData,
[3, 5, 10,15],
[10, 50, 100],
[0.5, 0.8, 1 ])
print("test")
auc = evaluateModel(model, testData)
print("Test AUC:" + str(auc))
print("Predict")
PredictData(sc, model, categoriesMap)

结果:

Predict
Data loading...
The number of data3171
Webhttp://www.lynnskitchenadventures.com/2009/04/homemade-enchilada-sauce.html
Prediction:0 Inllustration:ephemeral

Webhttp://lolpics.se/18552-stun-grenade-ar
Prediction:0 Inllustration:ephemeral

Webhttp://www.xcelerationfitness.com/treadmills.html
Prediction:0 Inllustration:ephemeral

Webhttp://www.bloomberg.com/news/2012-02-06/syria-s-assad-deploys-tactics-of-father-to-crush-revolt-threatening-reign.html
Prediction:0 Inllustration:ephemeral

Webhttp://www.wired.com/gadgetlab/2011/12/stem-turns-lemons-and-limes-into-juicy-atomizers/
Prediction:0 Inllustration:ephemeral

Webhttp://www.latimes.com/health/boostershots/la-heb-fat-tax-denmark-20111013,0,2603132.story
Prediction:0 Inllustration:ephemeral

Webhttp://www.howlifeworks.com/a/a?AG_ID=1186&cid=7340ci
Prediction:0 Inllustration:ephemeral

Webhttp://romancingthestoveblog.wordpress.com/2010/01/13/sweet-potato-ravioli-with-lemon-sage-brown-butter-sauce/
Prediction:0 Inllustration:ephemeral

Webhttp://www.funniez.net/Funny-Pictures/turn-men-down.html
Prediction:0 Inllustration:ephemeral

Webhttp://youfellasleepwatchingadvd.com/
Prediction:0 Inllustration:ephemeral

上一篇:MLlib算法简介


下一篇:Python+Spark2.0+hadoop学习笔记——Python Spark MLlib决策树二分类