import org.apache.spark.{SparkConf, SparkContext}
object Test6 {
def main(args: Array[String]): Unit = {
val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc =new SparkContext(sparkconf)
val rdd= sc.parallelize(List(1,2,5,7,8,9,3,4,4,5),2)
rdd.distinct().foreach(println)
val rdd1 = sc.parallelize(List(hotel("lin",32),hotel("lin",17),hotel("lin",9),hotel("lin",2),
hotel("lin",29),hotel("long",39),hotel("lin",39),hotel("zhou",39),hotel("sun",39)),2)
rdd1.distinct().collect().foreach(println)
}
}
/**
* 自定义去重样例类
* distinct 使用值的hash值进行去重
*
*/
case class hotel(name:String,price:Int){
override def hashCode(): Int = this.price
override def equals(obj: Any): Boolean = obj match {
case hotel(_, _) => this.price == price
case _ => false
}
}
spark去重的底层原理是 先调用无参的distinct,再调用有参的distinct函数
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
再调用有参的distinct函数,将数据通过匹配,转化为(_,1),再合并取出key,返回一个iterator集合
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
// Create an instance of external append only map which ignores values.
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}