import com.hankcs.hanlp.HanLP
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.StringOps
import scala.collection.mutable
object SoGou_WC {
def main(args: Array[String]): Unit = {
//准备环境
// val conf: SparkConf = new SparkConf().setAppName("SouGou_WC").setMaster("local[*]")
val conf: SparkConf = new SparkConf().setAppName("SouGou_WC") //SouGou_WC集群模式
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//读取数据
// val data: RDD[String] = sc.textFile("data/input/SogouQ.sample")
val data: RDD[String] = sc.textFile(args(0))
//封装数据
val sogouDataRDD: RDD[SogouData] = data.map(line => {
val arr: Array[String] = line.split("\\s+")
SogouData(
arr(0),
arr(1),
arr(2),
arr(3).toInt,
arr(4).toInt,
arr(5),
)
})
//处理数据
val res1: Array[(String, Int)] = sogouDataRDD.flatMap(record => {
val str: StringOps = record.data3.replaceAll("\\[|\\]", "")
import scala.collection.JavaConverters._
HanLP.segment(str).asScala.map(_.word)
})
.filter(word => {
if(word.equals("+") || word.equals(".")){
false
}
else
true
})
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)
val res2: Array[((String, String), Int)] = sogouDataRDD.flatMap(record => {
val str: StringOps = record.data3.replaceAll("\\[|\\]", "")
import scala.collection.JavaConverters._
val words: mutable.Buffer[String] = HanLP.segment(str).asScala.map(_.word)
val userid: String = record.data2
words.map(word => (userid, word))
})
.filter(tuple => {
if (tuple._2.equals("+") || tuple._2.equals(".")) {
false
}
else
true
})
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)
val res3: Array[(String, Int)] = sogouDataRDD.map(record => {
val time: String = record.data1
val time1: String = time.substring(0, 5)
(time1, 1)
})
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)
//输出结果数据
res1.foreach(println)
res2.foreach(println)
res3.foreach(println)
sc.makeRDD(res1).coalesce(1,true).saveAsTextFile(args(1))
sc.makeRDD(res2).coalesce(1,true).saveAsTextFile(args(2))
sc.makeRDD(res3).coalesce(1,true).saveAsTextFile(args(3))
// sc.makeRDD(res1).coalesce(1,true).saveAsTextFile("data/output1")
// sc.makeRDD(res2).coalesce(1,true).saveAsTextFile("data/output2")
// sc.makeRDD(res3).coalesce(1,true).saveAsTextFile("data/output3")
// System.setProperty("HADOOP_USER_NAME","node") //权限
//关闭环境
sc.stop()
}
case class SogouData(
data1 : String,
data2 : String,
data3 : String,
data4 : Int,
data5 : Int,
data6 : String
)
}