spark的去重算子

import org.apache.spark.{SparkConf, SparkContext}

object Test6 {

  def main(args: Array[String]): Unit = {
    val sparkconf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
    val sc =new SparkContext(sparkconf)
    val rdd= sc.parallelize(List(1,2,5,7,8,9,3,4,4,5),2)
    rdd.distinct().foreach(println)
    val rdd1 = sc.parallelize(List(hotel("lin",32),hotel("lin",17),hotel("lin",9),hotel("lin",2),
      hotel("lin",29),hotel("long",39),hotel("lin",39),hotel("zhou",39),hotel("sun",39)),2)
    rdd1.distinct().collect().foreach(println)

  }

}
/**
 * 自定义去重样例类
 * distinct 使用值的hash值进行去重
 *
 */
case class  hotel(name:String,price:Int){
  override def hashCode(): Int =  this.price

  override def equals(obj: Any): Boolean = obj match {
      case hotel(_, _) => this.price == price
      case _ => false
  }
}

spark的去重算子
spark去重的底层原理是 先调用无参的distinct,再调用有参的distinct函数


  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }

再调用有参的distinct函数,将数据通过匹配,转化为(_,1),再合并取出key,返回一个iterator集合

 /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
      // Create an instance of external append only map which ignores values.
      val map = new ExternalAppendOnlyMap[T, Null, Null](
        createCombiner = _ => null,
        mergeValue = (a, b) => a,
        mergeCombiners = (a, b) => a)
      map.insertAll(partition.map(_ -> null))
      map.iterator.map(_._1)
    }
    partitioner match {
      case Some(_) if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    }
  }
上一篇:oracle中distinct和count函数组合使用


下一篇:vue 状态管理 三、Mutations和Getters用法