大家好啊,这里就不自我介绍了,我们说一下WordCount,也就是词频。大家可能在各种渠道学习数据处理都会是WordCount首当其冲,为什么呢?因为WordCount简单。但是可以很好的形容数据处理和数据统计。今天我们也跟风的讲一讲WordCount,但是呢?我们不是泛泛的讲讲,我们是抱着系统学习的态度开始的。因为实现WordCount的方法有很多,每一种方法都是不同的算子,都会让你有不同的收获。那就开始了哈。
一、数据源。
[root@host juana]# touch data.txt
[root@host juana]# vim data.txt
liubei,sunshangxiang,zhaoyun
minyue,guanyu,juyoujin,nakelulu
liubei,libai
libai,guanyu,bailishouyue
二、具体实现。
1、方法1
这是最原始的方法。
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
// 扁平化处理
.flatMap(line=>{line.split(",")})
// 逐个击破
.map(x=>(x,1)).reduceByKey(_+_)
// 逐个输出
.foreach(println)
// 关闭环境
sc.stop()
}
}
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
2、方法2
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
.flatMap(line => {line.split(" ")})
.map(data => (data, 1))
.groupBy(_._1)
.map(data=>(data._1, data._2.size))
.foreach(println)
sc.stop()
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
3、方法3
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
.flatMap(line => {line.split(" ")})
.map(data => (data, 1))
.groupByKey()
.map(data=>(data._1, data._2.size))
.foreach(println)
sc.stop()
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
4、方法4
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
.flatMap(line => line.split(" "))
.map(data => (data, 1))
.aggregateByKey(0)(_ + _, _ + _)
.foreach(println)
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
5、方法5
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
.flatMap(line => line.split(" "))
.map(data => (data, 1))
.foldByKey(0)( _ + _)
.foreach(println)
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
6、方法6
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
sc.textFile("data/data.txt")
.flatMap(line => line.split(" "))
.map(data => (data, 1))
.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y)=>(x+y))
.foreach(println)
output
(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)
7、方法7
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
val Rdd: RDD[String] = sc.textFile("data/data.txt")
val rdd: RDD[String] = Rdd.flatMap(line => {
val strings: Array[String] = line.split(",")
strings
})
val stringToLong: collection.Map[String, Long] = rdd.map(data => (data, 1)).countByKey()
println(stringToLong)
output
Map(
nakelulu -> 1,
juyoujin -> 1,
sunshangxiang -> 1,
libai -> 2,
minyue -> 1,
zhaoyun -> 1,
liubei -> 2,
guanyu -> 2,
bailishouyue -> 1
)
8、方法8
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
val Rdd: RDD[String] = sc.textFile("data/data.txt")
val rdd: RDD[String] = Rdd.flatMap(line => {
val strings: Array[String] = line.split(",")
strings
})
val stringToLong: collection.Map[String, Long] = rdd.countByValue()
println(stringToLong)
output
Map(
nakelulu -> 1,
juyoujin -> 1,
sunshangxiang -> 1,
libai -> 2,
minyue -> 1,
zhaoyun -> 1,
liubei -> 2,
guanyu -> 2,
bailishouyue -> 1
)
9、方法9
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
val Rdd: RDD[String] = sc.textFile("data/data.txt")
val rdd: RDD[String] = Rdd.flatMap(line => {
val strings: Array[String] = line.split(",")
strings
})
val RDD1: RDD[mutable.Map[String, Long]] = rdd.map(word => mutable.Map[String, Long]((word, 1L)))
val stringToInt: mutable.Map[String, Long] = RDD1.reduce((map1, map2) => {
map2.foreach {
case (word, count) =>
val newCount: Long = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
map1
}
)
println(stringToInt)
output
Map(
nakelulu -> 1,
juyoujin -> 1,
sunshangxiang -> 1,
libai -> 2,
minyue -> 1,
zhaoyun -> 1,
liubei -> 2,
guanyu -> 2,
bailishouyue -> 1
)
10、方法10
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
val Rdd: RDD[String] = sc.textFile("data/data.txt")
val rdd: RDD[String] = Rdd.flatMap(line => {
val strings: Array[String] = line.split(",")
strings
})
val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))
val stringToInt: mutable.Map[String, Int] = RDD1.aggregate(mutable.Map[String, Int]())((map1, map2) => {
map2.foreach {
case (word, count) =>
val newCount: Int = map1.getOrElse(word, 0) + count
map1.update(word, newCount)
}
map1
}, (map1, map2) => {
map2.foreach {
case (word, count) =>
val newCount: Int = map1.getOrElse(word, 0) + count
map1.update(word, newCount)
}
map1
})
println(stringToInt)
output
Map(
nakelulu -> 1,
juyoujin -> 1,
sunshangxiang -> 1,
libai -> 2,
minyue -> 1,
zhaoyun -> 1,
liubei -> 2,
guanyu -> 2,
bailishouyue -> 1
)
11、方法11
object WordCount {
def main(args: Array[String]): Unit = {
// 配置spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
// 新建SparkContext
val sc = new SparkContext(conf)
//读取文件
val Rdd: RDD[String] = sc.textFile("data/data.txt")
val rdd: RDD[String] = Rdd.flatMap(line => {
val strings: Array[String] = line.split(",")
strings})
val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))
val stringToInt: mutable.Map[String, Int] = RDD1.fold(mutable.Map[String, Int]())((map1, map2) => {
map2.foreach {
case (word, count) =>
val newCount: Int = map1.getOrElse(word, 0) + count
map1.update(word, newCount)
}
map1
})
println(stringToInt)
output
Map(
nakelulu -> 1,
juyoujin -> 1,
sunshangxiang -> 1,
libai -> 2,
minyue -> 1,
zhaoyun -> 1,
liubei -> 2,
guanyu -> 2,
bailishouyue -> 1
)
好了,上面就是我们的11种实现方法,那现在我们来做一个总结吧。
大家可能注意到了,方法6及以前都是单个输出,但是方法7以及以后j结果都是Map?是巧合吗?还是道德的缺失?
好了不扯了,主要是因为实现WordCount的主要算子不一样。前面都是Transformation (转换)算子,后面是Action (行动)算子。
来,瞧瞧吧。
Transformation 是惰性算子,待需要的时候执行,Action 是活动算子,直接生成任务执行。一个Action 对应着一个任务。
算子类型 | 实现主要算子 | 算子简介 |
---|---|---|
Transformation | groupBy | value数据类型分组算子,使用需要指定分组值,返回值是二元组(k,迭代器) |
Transformation | groupByKey | Key-Value数据类型分组算子,不需要指定分组数据,直接按照k分组。返回值是二元组(k,迭代器) |
Transformation | reduceByKey | Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间函数和分区类函数一样) |
Transformation | aggregateByKey | Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间和分区类函数不一样)和聚合初始值,柯里化 |
Transformation | foldByKey | 分组聚合类似于aggregateByKey 只是foldByKey可以表示分区间和分区类的计算逻辑是一样的,柯里化 |
Transformation | combineByKey | Key-Value数据类型分组聚合算子,有三个参数第一个参数:对第一个数据做修饰,第二个参数:分区内聚合函数, 第三个参数:分区间聚合函数,中间变量的类型有可能编译没办法识别,需要标明泛型 |
Action | countByKey | 按照key值进行分组聚合,底层调用的是reduceByKey(+) |
Action | countByValue | 直接聚合,底层调用countByKey ,map(value => (value, null)).countByKey() |
Action | reduce | 聚合算子,底层需要自己去实现,存在的价值是自定义底层。见上面的用法即明白 |
Action | fold | 比reduce算子,多一个参数,可以设置聚合时中间临时变量的初始值] |
Action | aggregate | 可以执行分区间聚合和分区类聚合,比如fold多一个参数,分别设置RDD数据集合时局部聚合函数和全局聚合函数 |
以上只是算子的简单介绍,后面我们会对其原理以及源码进行说明。这些简介配合用法大家先看着理解哈,各位看客,怠慢了。