Spark LDA 实例
一、准备数据
数据格式为:documents: RDD[(Long, Vector)],其中:Long为文章ID,Vector为文章分词后的词向量;
通过分词以及数据格式的转换,转换成RDD[(Long, Vector)]即可。
二、建立模型
import org.apache.spark.mllib.clustering._
val ldaOptimizer = new OnlineLDAOptimizer().setOptimizeDocConcentration(true)
val lda = new LDA()
lda.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
.setOptimizer(ldaOptimizer)
.setCheckpointInterval(10)
.setSeed(1234)
val modelLDA: LDAModel = lda.run(corpus)
modelLDA.save(sc.sparkContext, params.modelPath)
三、模型参数
case class NewsParams(
k: Int = 100,
maxIterations: Int = 100,
docConcentration: Double = -1,
topicConcentration: Double = -1,
stopWordFile: String = "zh_stopwords.txt",
modelPath: String = "LDAModel.14.100",
ldaJsonPath:String = "ldaModel.14.200.json",
vocabPath: String = "vocab_info" )
四、结果输出
topicsMatrix以及topics(word,topic))输出。mllib上的lda不是分布式的,目前只存储topic的信息,而不存储doc的信息,如果获取只能使用ml中的lda或者通过以下代码实现。
val ldaModel = lda.run(documents)
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
distLDAModel.topicDistributions