1.groupByKey:
它是将RDD中相同的key值得数据(value)合并成为一序列,只能输出相同key值得序列。
2.reduceByKey:
因为内部调用的combineByKey函数,会先进行局部聚合, 再进行全局聚合,(类似于mapreduce里的combine操作)这样会大大减少网络IO, 起到优化作用。
相同点:
- 都作用于 RDD[K,V]
- 都是根据key来分组聚合
- 默认,分区的数量都是不变的,但是都可以通过参数来指定分区数量
不同点:
- groupByKey默认没有聚合函数,得到的返回值类型是RDD[ k,Iterable[V]]
- reduceByKey 必须传聚合函数 得到的返回值类型 RDD[(K,聚合后的V)]
- groupByKey().map() = reduceByKey
最重要的区别:
- reduceByKey 会进行分区内聚合,然后再进行网络传输
- groupByKey 不会进行局部聚合
结论:
- 如果这两个算子,都可以使用, 优先使用reduceByKey
代码演示:
groupByKey
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* @date :2021/3/30 15:24
* @author :xiaotao
* @description :groupByKey
*/
object groupByKeyDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val ss: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val arr = Array(("A", 1), ("B", 2), ("B", 5), ("A", 3), ("A", 4))
val rdd = ss.sparkContext.parallelize(arr)
val groupByKeyRDD = rdd.groupByKey()
val sumValueRDD = groupByKeyRDD.map(x => {
val ite = x._2
val array = ite.toArray
var sum = 0
for (elem <- array) {
sum = sum + elem
}
(x._1, sum)
})
val tuples = sumValueRDD.collect()
for (elem <- tuples) {
println(elem)
//(A,8)
//(B,7)
}
ss.stop()
}
}
reduceByKey
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
/**
* @date :2021/3/30 15:47
* @author :xiaotao
* @description :reduceByKey
*/
object reduceByKeyDemo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val ss: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
val arr = Array(("A", 1), ("B", 2), ("B", 5), ("A", 3), ("A", 4))
val rdd = ss.sparkContext.parallelize(arr)
val unit = rdd.reduceByKey(_ + _)
for (elem <- unit.collect()) {
println(elem)
//(A,8)
//(B,7)
}
ss.stop()
}
}