六、clustering
K-means
k均值是最常用的聚类算法之一,它将数据点聚集成预定数量的聚类。 MLlib实现包括k-means ++方法的并行变体,称为kmeans ||。
KMeans被实现为Estimator,并生成KMeansModel作为基础模型。
1)Input and Ouputs(输入参数和输出参数)
示例代码
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// Make predictions
val predictions = model.transform(dataset)
// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
Latent Dirichlet allocation
LDA被实现为支持EMLDAOptimizer和OnlineLDAOptimizer的Estimator,并生成LDAModel作为基础模型。如果需要,专家用户可以将EMLDAOptimizer生成的LDAModel强制转换为DistributedLDAModel。
示例代码
import org.apache.spark.ml.clustering.LDA
// Loads data.
val dataset = spark.read.format("libsvm")
.load("data/mllib/sample_lda_libsvm_data.txt")
// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")
// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)
// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
Bisecting k-means(均分k均值)
将k均值平分是一种使用除法(或“自上而下”)方法的层次聚类:所有观测值都在一个聚类中开始,并且随着一个人在层次中向下移动而递归执行拆分。
平分K均值通常会比常规K均值快得多,但通常会产生不同的聚类。
BisectingKMeans被实现为一个估计器,并生成BisectingKMeansModel作为基础模型。
import org.apache.spark.ml.clustering.BisectingKMeans
// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)
// Evaluate clustering.
val cost = model.computeCost(dataset)
println(s"Within Set Sum of Squared Errors = $cost")
// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
Gaussion Mixture Model(GMM-高斯混合模型)
高斯混合模型表示一种复合分布,其中从k个高斯子分布之一中抽取点,每个子分布都有自己的概率。 spark.ml实现使用期望最大化算法在给定一组样本的情况下得出最大似然模型。
GaussianMixture被实现为Estimator,并生成GaussianMixtureModel作为基础模型。
import org.apache.spark.ml.clustering.GaussianMixture
// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
.setK(2)
val model = gmm.fit(dataset)
// output parameters of mixture model model
for (i <- 0 until model.getK) {
println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
九、model selection and hyperparameter tuning
Model selection (a.k.a. hyperparameter tuning)
ML中的一项重要任务是模型选择,或使用数据为给定任务找到最佳模型或参数。这也称为调整。可以针对单个估算器(例如LogisticRegression)进行调整,也可以针对包括多个算法,功能化和其他步骤的整个管道进行调整。用户可以一次调整整个管道,而不必分别调整管道中的每个元素。
MLlib使用诸如CrossValidator和TrainValidationSplit之类的工具支持模型选择。这些工具需要以下各项:
- Estimator: 调整算法或管道
- Set of ParamMaps: 可供选择的参数,有时也称为“参数网格”
- Evaluator: 度量拟合模型对保留的测试数据的性能的度量
在较高级别,这些模型选择工具的工作方式如下:
- 他们将输入数据分为单独的训练和测试数据集。
- 对于每对(训练,测试),它们都会遍历一组ParamMap:
- 对于每个ParamMap,他们使用这些参数拟合Estimator,获得拟合的Model,然后使用Evaluator评估Model的性能。
- 他们选择由性能最佳的参数集生成的模型。
该评估器可以是用于回归问题的RegressionEvaluator,用于二进制数据的BinaryClassificationEvaluator或用于多类问题的MulticlassClassificationEvaluator。每个评估器中的setMetricName方法都可以覆盖用于选择最佳ParamMap的默认度量。
为了帮助构造参数网格,用户可以使用ParamGridBuilder实用程序。默认情况下,来自参数网格的参数集是串行评估的。在使用CrossValidator或TrainValidationSplit运行模型选择之前,可以通过将并行度设置为2或更大(值1为串行)来并行执行参数评估。应当仔细选择并行度的值,以在不超出群集资源的情况下最大程度地提高并行度,并且较大的值可能并不总是可以提高性能。一般来说,对于大多数群集,最大为10的值就足够了。
Cross-Validation
CrossValidator首先将数据集分成一组折叠,这些折叠用作单独的训练和测试数据集。例如,如果k = 3倍,CrossValidator将生成3个(训练,测试)数据集对,每个对都使用2/3的数据进行训练,而使用1/3的数据进行测试。 为了评估特定的ParamMap,CrossValidator为3个模型(通过将Estimator拟合到3个不同的(训练,测试)数据集对上)计算出平均评估指标。
在确定最佳的ParamMap之后,CrossValidator最终使用最佳的ParamMap和整个数据集重新拟合Estimator。
示例:通过交叉验证选择模型
下面的示例演示如何使用CrossValidator从参数网格中进行选择。
请注意,在参数网格上进行交叉验证的成本很高。例如,在下面的示例中,参数网格具有3个值的hashingTF.numFeatures和2个值的lr.regParam,而CrossValidator使用2折。这乘以(3×2)×2 = 12个正在训练的不同模型。在实际设置中,尝试更多的参数并使用更多的折叠数(通常是k = 3和k = 10)是很常见的。换句话说,使用CrossValidator可能非常昂贵。但是,这也是一种公认的用于选择参数的方法,该方法在统计上比启发式手动调整更合理。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.Row
// Prepare training data from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0),
(10L, "spark compile", 1.0),
(11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // Use 3+ in practice
.setParallelism(2) // Evaluate up to 2 parameter settings in parallel
// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
Train-Validation Split
除CrossValidator外,Spark还提供TrainValidationSplit用于超参数调整。 TrainValidationSplit仅对每个参数组合进行一次评估,而对于CrossValidator而言,则仅进行k次评估。因此,它便宜一些,但是当训练数据集不够大时,不会产生可靠的结果。
与CrossValidator不同,TrainValidationSplit创建单个(训练,测试)数据集对。它将使用trainRatio参数将数据集分为这两部分。例如,对于trainRatio = 0.75,TrainValidationSplit将生成训练和测试数据集对,其中75%的数据用于训练,而25%的数据用于验证。
像CrossValidator一样,TrainValidationSplit最终使用最佳的ParamMap和整个数据集来拟合Estimator。
示例:通过训练集验证拆分选择模型
示例代码
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
// Prepare training and test data.
val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
val lr = new LinearRegression()
.setMaxIter(10)
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
// 80% of the data will be used for training and the remaining 20% for validation.
.setTrainRatio(0.8)
// Evaluate up to 2 parameter settings in parallel
.setParallelism(2)
// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)
// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
.select("features", "label", "prediction")
.show()
TIAN_R
发布了2 篇原创文章 · 获赞 0 · 访问量 575
私信
关注