spark中 reduceByKey() 和 groupByKey() 的 区别

1.groupByKey
它是将RDD中相同的key值得数据(value)合并成为一序列,只能输出相同key值得序列。

2.reduceByKey
因为内部调用的combineByKey函数,会先进行局部聚合, 再进行全局聚合,(类似于mapreduce里的combine操作)这样会大大减少网络IO, 起到优化作用。

相同点:

  • 都作用于 RDD[K,V]
  • 都是根据key来分组聚合
  • 默认,分区的数量都是不变的,但是都可以通过参数来指定分区数量

不同点:

  • groupByKey默认没有聚合函数,得到的返回值类型是RDD[ k,Iterable[V]]
  • reduceByKey 必须传聚合函数 得到的返回值类型 RDD[(K,聚合后的V)]
  • groupByKey().map() = reduceByKey

最重要的区别:

  • reduceByKey 会进行分区内聚合,然后再进行网络传输
  • groupByKey 不会进行局部聚合

结论:

  • 如果这两个算子,都可以使用, 优先使用reduceByKey

代码演示:
groupByKey

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * @date :2021/3/30 15:24
  * @author :xiaotao
  * @description :groupByKey
  */
object groupByKeyDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val ss: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    val arr = Array(("A", 1), ("B", 2), ("B", 5), ("A", 3), ("A", 4))
    val rdd = ss.sparkContext.parallelize(arr)

    val groupByKeyRDD = rdd.groupByKey()

    val sumValueRDD = groupByKeyRDD.map(x => {
      val ite = x._2
      val array = ite.toArray
      var sum = 0
      for (elem <- array) {
        sum = sum + elem
      }
      (x._1, sum)
    })

    val tuples = sumValueRDD.collect()

    for (elem <- tuples) {
      println(elem)
      //(A,8)
      //(B,7)
    }

    ss.stop()
  }
}

reduceByKey

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * @date :2021/3/30 15:47
  * @author :xiaotao
  * @description :reduceByKey
  */
object reduceByKeyDemo {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)
    val ss: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(this.getClass.getSimpleName)
      .getOrCreate()

    val arr = Array(("A", 1), ("B", 2), ("B", 5), ("A", 3), ("A", 4))
    val rdd = ss.sparkContext.parallelize(arr)

    val unit = rdd.reduceByKey(_ + _)
    for (elem <- unit.collect()) {
      println(elem)
      //(A,8)
      //(B,7)
    }
    ss.stop()
  }
}

上一篇:Spark中groupByKey和reduceByKey的区别


下一篇:Spark union