spark-万物之源WordCount(四)

Spark实现WordCount的N种方法


  大家好啊,这里就不自我介绍了,我们说一下WordCount,也就是词频。大家可能在各种渠道学习数据处理都会是WordCount首当其冲,为什么呢?因为WordCount简单。但是可以很好的形容数据处理和数据统计。今天我们也跟风的讲一讲WordCount,但是呢?我们不是泛泛的讲讲,我们是抱着系统学习的态度开始的。因为实现WordCount的方法有很多,每一种方法都是不同的算子,都会让你有不同的收获。那就开始了哈。 spark-万物之源WordCount(四)

一、数据源。

[root@host juana]# touch data.txt
[root@host juana]# vim data.txt
liubei,sunshangxiang,zhaoyun
minyue,guanyu,juyoujin,nakelulu
liubei,libai
libai,guanyu,bailishouyue

二、具体实现。

1、方法1

  这是最原始的方法。

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    // 扁平化处理
      .flatMap(line=>{line.split(",")})
     // 逐个击破
      .map(x=>(x,1)).reduceByKey(_+_)
     // 逐个输出
      .foreach(println)
     // 关闭环境
    sc.stop()
  }
}

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

2、方法2

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => {line.split(" ")})
    .map(data => (data, 1))
    .groupBy(_._1)
    .map(data=>(data._1, data._2.size))
    .foreach(println)
    sc.stop()

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

3、方法3

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => {line.split(" ")})
	.map(data => (data, 1))
	.groupByKey()
	.map(data=>(data._1, data._2.size))
	.foreach(println)
    sc.stop()

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

4、方法4

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.aggregateByKey(0)(_ + _, _ + _)
    .foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

5、方法5

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.foldByKey(0)( _ + _)
	.foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

6、方法6

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
    sc.textFile("data/data.txt")
    .flatMap(line => line.split(" "))
	.map(data => (data, 1))
	.combineByKey(v=>v,(x:Int,y)=>(x+y),(x:Int,y)=>(x+y))
    .foreach(println)

output

(liubei,2)
(zhaoyun,1)
(sunshangxiang,1)
(nakelulu,1)
(libai,2)
(juyoujin,1)
(guanyu,2)
(bailishouyue,1)
(minyue,1)

7、方法7

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
   val stringToLong: collection.Map[String, Long] = rdd.map(data => (data, 1)).countByKey()
    println(stringToLong)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

8、方法8

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
   val stringToLong: collection.Map[String, Long] = rdd.countByValue()
    println(stringToLong)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

9、方法9

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
    val RDD1: RDD[mutable.Map[String, Long]] = rdd.map(word => mutable.Map[String, Long]((word, 1L)))
    val stringToInt: mutable.Map[String, Long] = RDD1.reduce((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Long = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
      }
      map1
    }
    )
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

10、方法10

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings
  		})
    val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))

    val stringToInt: mutable.Map[String, Int] = RDD1.aggregate(mutable.Map[String, Int]())((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    }, (map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    })
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

11、方法11

object WordCount {
  def main(args: Array[String]): Unit = {
	// 配置spark环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("wc")
	// 新建SparkContext
    val sc = new SparkContext(conf)
    //读取文件
	val Rdd: RDD[String] = sc.textFile("data/data.txt")
  	val rdd: RDD[String] = Rdd.flatMap(line => {
    val strings: Array[String] = line.split(",")
    strings})
    val RDD1: RDD[mutable.Map[String, Int]] = rdd.map(word => mutable.Map[String, Int]((word, 1)))
    val stringToInt: mutable.Map[String, Int] = RDD1.fold(mutable.Map[String, Int]())((map1, map2) => {
      map2.foreach {
        case (word, count) =>
          val newCount: Int = map1.getOrElse(word, 0) + count
          map1.update(word, newCount)
      }
      map1
    })
    println(stringToInt)

output

Map(
 nakelulu -> 1,
 juyoujin -> 1,
 sunshangxiang -> 1,
 libai -> 2,
 minyue -> 1,
 zhaoyun -> 1,
 liubei -> 2, 
 guanyu -> 2, 
 bailishouyue -> 1
 )

好了,上面就是我们的11种实现方法,那现在我们来做一个总结吧。
大家可能注意到了,方法6及以前都是单个输出,但是方法7以及以后j结果都是Map?是巧合吗?还是道德的缺失?
spark-万物之源WordCount(四)
好了不扯了,主要是因为实现WordCount的主要算子不一样。前面都是Transformation (转换)算子,后面是Action (行动)算子。
来,瞧瞧吧。

Transformation 是惰性算子,待需要的时候执行,Action 是活动算子,直接生成任务执行。一个Action 对应着一个任务。

算子类型 实现主要算子 算子简介
Transformation groupBy value数据类型分组算子,使用需要指定分组值,返回值是二元组(k,迭代器)
Transformation groupByKey Key-Value数据类型分组算子,不需要指定分组数据,直接按照k分组。返回值是二元组(k,迭代器)
Transformation reduceByKey Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间函数和分区类函数一样)
Transformation aggregateByKey Key-Value数据类型分组聚合算子,不需要指定分组数据,直接按照k分组,需要指定聚合函数(分区间和分区类函数不一样)和聚合初始值,柯里化
Transformation foldByKey 分组聚合类似于aggregateByKey 只是foldByKey可以表示分区间和分区类的计算逻辑是一样的,柯里化
Transformation combineByKey Key-Value数据类型分组聚合算子,有三个参数第一个参数:对第一个数据做修饰,第二个参数:分区内聚合函数, 第三个参数:分区间聚合函数,中间变量的类型有可能编译没办法识别,需要标明泛型
Action countByKey 按照key值进行分组聚合,底层调用的是reduceByKey(+)
Action countByValue 直接聚合,底层调用countByKey ,map(value => (value, null)).countByKey()
Action reduce 聚合算子,底层需要自己去实现,存在的价值是自定义底层。见上面的用法即明白
Action fold 比reduce算子,多一个参数,可以设置聚合时中间临时变量的初始值]
Action aggregate 可以执行分区间聚合和分区类聚合,比如fold多一个参数,分别设置RDD数据集合时局部聚合函数和全局聚合函数


以上只是算子的简单介绍,后面我们会对其原理以及源码进行说明。这些简介配合用法大家先看着理解哈,各位看客,怠慢了。

上一篇:Spark—WordCount


下一篇:解决 docker 容器时间与本地时间不一致