Spark基础篇-Spark-Core核心模型

第二章 Spark-Core核心模型

1.RDD

弹性分布式数据集(Resilient Distributed Dataset)是Spark中最基本的数据抽象。

  • 不可变(只读)

  • 可分区

  • 可并行计算

  • 自动容错

  • 位置感知性调度

RDD是Spark的核心抽象模型,本质上是一个抽象类。RDD源代码部分重点代码实现如下:

abstract class RDD[T: ClassTag](
  @transient private var _sc: SparkContext,
  @transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
  ......
   /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()
  ......
}

RDD有五个属性,用来描述数据集的状态。

  • partitions 数据分区(抽象)
  • compute 对分区计算的函数(抽象)
  • dependences 依赖列表(抽象)
  • partitioner 分区方式(具体)
  • preferredLocations 优选位置(具体)

注意,RDD的具体实现类中必须重写前三个属性。

思考1,RDD的具体实现类中后两个属性需要重写吗?

RDD的具体实现类如下:
Spark基础篇-Spark-Core核心模型

Spark基础篇-Spark-Core核心模型
Spark基础篇-Spark-Core核心模型
Spark基础篇-Spark-Core核心模型

注意,虽然RDD的实现类很多,但只需要掌握抽象RDD中的五个重要属性即可。

2.数据分区

站在数据的角度思考RDD,RDD是由数据分区(partition)组成,这些分区运行在集群中的不同节点上。

  • 一个RDD可以包含多个分区
  • 一个分区就是一个dataset片段
  • 一个分区会被封装成一个Task

RDD内部数据组成如图:
Spark基础篇-Spark-Core核心模型

数据分区源码实现如下:

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int
  // A better default implementation of HashCode
  override def hashCode(): Int = index
  override def equals(other: Any): Boolean = super.equals(other)
}

思考2,数据分区内部存储数据吗?RDD存储真正的数据吗?

数据分区内部并不会存储具体的数据。

  1. Partition类内包含一个index成员,表示该分区在 RDD内的编号;
  2. 通过RDD编号+分区编号可以唯一确定该分区对应的块编号;
  3. 利用底层数据存储层提供的接口;
  4. 就可以从存储介质(如:HDFS、Memory)中提取出分区对应的数据。

2.1构建RDD

2.1.1读取外部数据集

  • 文本文件

sc.textFile(path[,minPartitions])
sc.wholeTextFiles(path[,minPartitions])

  • 字节文件

binaryFiles(path[,minPartitions])

  • 对象文件

sc.objectFile[T](path)

  • SequenceFile

sc.sequenceFile(path,keyClass,valueClass[,minPartitions])
sc.sequenceFile[K,V](path[,minPartitions])

  • Hadoop输入输出格式

sc.newAPIHadoopFile[Text,Text,KeyValueTextInputFormat](path)
sc.newAPIHadoopFile[F](path)

说明:

  • path可以是文件也可以是目录,也可以是带有匹配符号的路径。

  • minPartitions是指用户未给定时HadoopRDD的默认最小分区数。注意,我们使用math.min所以"defaultMinPartitions"不能大于2。

  • keyClass、valueClass是指数据文件中key、value的数据类型。

在源码中各个方法定义如下:

//1.文本文件
def textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
             minPartitions).map(pair => pair._2.toString).setName(path)
}

def wholeTextFiles(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
  /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  //....
}
//2.字节文件
def binaryFiles(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
  //...
}
//3.对象文件
def objectFile[T: ClassTag](
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
  assertNotStopped()
  sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
  .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}
//4.sequence文件
def sequenceFile[K, V](path: String,
                       keyClass: Class[K],
                       valueClass: Class[V],
                       minPartitions: Int
                      ): RDD[(K, V)] = withScope {
  assertNotStopped()
  val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
  hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

def sequenceFile[K, V](
  path: String,
  keyClass: Class[K],
  valueClass: Class[V]): RDD[(K, V)] = withScope {
  assertNotStopped()
  sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
 kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
  withScope {
    assertNotStopped()
    val kc = clean(kcf)()
    val vc = clean(vcf)()
    val format = classOf[SequenceFileInputFormat[Writable, Writable]]
    val writables = hadoopFile(path, format,
                               kc.writableClass(km).asInstanceOf[Class[Writable]],
                               vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
    writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
  }
}
//5.newAPIHadoop文件
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
  newAPIHadoopFile(
    path,
    fm.runtimeClass.asInstanceOf[Class[F]],
    km.runtimeClass.asInstanceOf[Class[K]],
    vm.runtimeClass.asInstanceOf[Class[V]])
}
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
  assertNotStopped()
  //...
}

def defaultParallelism: Int = {
  assertNotStopped()
  taskScheduler.defaultParallelism
}
/**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

2.1.2 对一个Seq并行化

sc.makeRDD(seq[,numPartition])

sc.parallelize(seq[,numPartition])

说明:

  • seq是指Seq集合
  • numPartition是指分区个数(并行度)

在源码中makeRDDparallelize方法定义如下:

def makeRDD[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T] = withScope {
  parallelize(seq, numSlices)
}
def parallelize[T: ClassTag](
  seq: Seq[T],
  numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

2.1.3案例

  1. 分别使用textFilewholeTextFiles方法读取/opt/spark/README.md文件。
object RDDTest1{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例1")
    //2.获取SparkContext对象
    val sc=new SparkContext(conf);
    sc.setLogLevel("warn")
    
    //3.构建RDD
    
    //3.1使用textFile读取文本文件
    val rdd1=sc.textFile("/opt/spark/README.md")
    println(rdd1.count)
    println(rdd1.first)
    
    //3.2使用wholeTextFiles读取文本文件
    val rdd2=sc.wholeTextFiles("/opt/spark/README.md")
    println(rdd2.count)
    println(rdd2.first)
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

观察,rdd1和rdd2调用相同方法输出有什么不同?

  1. 通过对Seq集合并行化构建RDD
object RDDTest2{
  def main(args:Array[String])={
    
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例2")
    //2.获取SparkContext对象
    val sc=new SparkContext(conf);
    sc.setLogLevel("warn")
    
    //3.构建RDD
    //使用makeRDD将seq集合并行化
    val seq=1 to 10
    val rdd1=sc.makeRDD(seq)
    
    //4.1调用map方法将rdd1中每个元素+1,返回一个新的RDD
    val rdd2=rdd1.map(x=>x+1)
    //4.2将rdd2中的元素输出到控制台
    rdd2.foreach(println)
    
    //5.关闭SparkContext对象
    sc.stop()
    
  }
}

注意,通过以上两个案例的编写,发现每次都需要构建SparkContext对象,且代码基本一致。

思考3,是否可以封装SparkContext对象的构建过程?如果可以的话,如何实现?

借助于Scala里面的包对象实现封装:

package com

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark使用工具类
  * 方便构建使用Spark
  * */
package object briup {

  private var _conf:Option[SparkConf]=None;
  private var _sc:Option[SparkContext]=None;
  private var _spark:Option[SparkSession]=None;
  private var _ssc:Option[StreamingContext]=None;
  implicit val jarFilePath:Option[String]=None;

  /**
  	*
  	* 获取SparkConf对象
  	**/
  private def getConf(master:String,appName:String,checkPoint:String="spark-checkpoint"):SparkConf={
    _conf match{
      case Some(conf) => conf
      case None =>
        val conf=new SparkConf()
        conf.setMaster(master)
        conf.setAppName(appName)
        conf.set("spark.sql.streaming.checkpointLocation",checkPoint)
        _conf=Some(conf)
        conf
    }
  }
  
  /**
    * 获取SparkContext对象
    * */
  def getSparkContext(master:String,appName:String)(implicit jarFilePath:Option[String]=None):SparkContext={
    _sc match{
      case Some(sc) => sc
      case None =>
        val conf=getConf(master,appName)
        //第一种构建方式
        //    val sc=new SparkContext(conf);
        //第二种构建方式
        val sc=SparkContext.getOrCreate(conf);
      
        jarFilePath match {
          case Some(filepath) => sc.addJar(filepath)
          case None =>
        }
        _sc=Some(sc)
      	sc.setLogLevel("warn")
        sc
    }
  }
  
  /**
    * 获取SparkSession对象
    * */
  def getSpark(master:String,appName:String,checkPoint:String="spark-checkpoint")(implicit jarFilePath:Option[String]):SparkSession={
    _spark match{
      case Some(spark) =>
        //        println("...获取已经存在的Spark...")
        spark
      case None =>
        //        println("...开始创建Spark...")
        val conf=getConf(master,appName)
        val spark=SparkSession.builder().config(conf).getOrCreate();
        jarFilePath match {
          case Some(filepath) => spark.sparkContext.addJar(filepath)
          case None => //println("无jarFilePath......");
        }
        _spark=Some(spark)
        spark
    }
  }
  
  /**
    * 获取StreamingContext对象
    * */
  def getStreamingSpark(master:String,appName:String,batchDur:Duration)(implicit jarFilePath:Option[String]=None):StreamingContext={
    _ssc match{
      case Some(ssc) =>ssc
      case None =>
        val conf=getConf(master,appName)
        val ssc=new StreamingContext(conf,batchDur)
        jarFilePath match {
          case Some(filepath) => ssc.sparkContext.addJar(filepath)
          case None =>  //println("无jarFilePath......");
        }
        _ssc=Some(ssc)
        ssc
    }
  }
}

  1. 操作非文本数据文件

数据目录:hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat

数据说明:用户ID::性别::年龄::职业代码::邮编

编码实现:

object RDDTest3{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("案例3-各种数据练习")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //3.获取RDD+4.RDD操作
    
    //1.读取文本文件构建RDD
    val rdd=sc.textFile("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
    //2.输出到控制台
    rdd.foreach(println)
    //3.保存为对象文件 注意,该参数为路径名称
    rdd.saveAsObjectFile("users_obj")
    //4.读取对象文件
    val objectRDD=sc.objectFile[String]("users_obj")
    //5.输出到控制台
    objectRDD.foreach(println)
    //6.读取字节文件
    val binaryRDD=sc.binaryFiles("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
    //8.输出到控制台
    binaryRDD.foreach(println)
    //9.保存为Sequence文件
    objectRDD.map(str=>(str.length,str)).saveAsSequenceFile("users_seq")
    //10.读取Sequence文件
    val seqRDD=sc.sequenceFile[Int,String]("users_seq")
    //11.输出到控制台
    seqRDD.foreach(println)
    //12.保存为Hadoop格式的文件
    seqRDD.map(tu=>(new IntWritable(tu._1),new Text(tu._2))).saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable,Text]]("users_hadoop")
    //13.读取Hadoop格式的文件
    val hadoopRDD=sc.newAPIHadoopFile[IntWritable,Text,SequenceFileInputFormat[IntWritable,Text]]("users_hadoop")
    //14.输出到控制台
    hadoopRDD.foreach(println)
    
    //5.关闭SparkContext对象
    sc.stop()
    
  }
}

思考4,学会了如何构建RDD,如何查看RDD中的分区个数?

2.2分区个数

获取分区个数:rdd对象.getNumPartitions

演示:

object RDDTest2{
  def main(args:Array[String])={
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示1-分区个数")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)

    
    //3.构建RDD
    val rdd=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    //3.1查看RDD的分区个数
    println(rdd.getNumPartitions)
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

注意,分区个数会决定Stage中Task的个数,分区个数是Spark任务调度中的并行度。

思考5,如何设置RDD的分区个数?

  • 获取RDD时指定
    • 读取外部文件时,可选参数minPartitions
      • 不指定时为math.min(Spark任务调度中的并行度,2)
      • minPartitions是指用户未给定时HadoopRDD的默认最小分区数
    • 并行Seq集合时,可选参数numPartition
      • 不指定时为Spark任务调度中的并行度
      • numPartition是指分区数
  • 重分区(调整RDD的分区个数)
    • rdd.repartition(numPartitions:Int)
    • rdd.coalesce(numPartitions:Int,shuffle:Boolean)
    • pairRdd.repartitionAndSortWithinPartitions(partitioner: Partitioner)

重分区的方法源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
      var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
        position = position + 1
        (position, t)
      }
    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
      new ShuffledRDD[Int, T, T](
        mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
        new HashPartitioner(numPartitions)),
      numPartitions,
      partitionCoalescer).values
  } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
  }
}

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

思考6,重分区一定需要通过网络混洗吗?

注意:

  • 对于repartitioncoalesace方法,Spark建议使用repartition的优化版coalesace
  • 如果重分区之后需要对分区内的数据进行排序,Spark建议使用repartitionAndSortWithinPartitions

演示:


object RDDTest2{
  def main(args:Array[String])={   
    //1.获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //2.获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //3.1构建RDD时指定分区个数
    val rdd1=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    println(rdd1.getNumPartitions)
    
    val rdd2=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat",5)
    println(rdd2.getNumPartitions)
    
    val rdd3=sc.makeRDD(1 to 10)
    println(rdd3.getNumPartitions)
    
    val rdd3=sc.makeRDD(1 to 10,4)
    println(rdd3.getNumPartitions)
    
    
    //3.2重分区调整分区个数
    println(s"重分区 repartition 前:分区个数:${rdd1.getNumPartitions}")
    val re_rdd1=rdd1.repartition(3)
    println(s"重分区 repartition 后:分区个数:${re_rdd1.getNumPartitions}")
    
    println("-----调小分区个数-------")
    println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
    val co_rdd2=rdd2.coalesce(3)
    println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
    
    println("-----调大分区个数-------")
    println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
    val co_rdd2=rdd2.coalesce(10)
    println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
    
    
    
     println(s"重分区 repartitionAndSortWithinPartitions 前:分区个数:${rdd3.getNumPartitions}")
    val partitioner=new HashPartitioner(5)
    val ras_rdd3=rdd3.map(x=>(x,1)).repartitionAndSortWithinPartitions(partitioner)
     println(s"重分区 repartitionAndSortWithinPartitions 后:分区个数:${ras_rdd3.getNumPartitions}")
    
       
    
    //5.关闭SparkContext对象
    sc.stop()
  }
}

思考7,coalesce方法中第二个参数的含义?coalesce优于repartition的原因?

原因:

  • repartition操作一定会产生Shuffle操作,网络开销大,性能降低。
  • coalesce针对调小分区个数可以不用产生Shuffle操作,故可以节省网络开销,提高效率。

思考8,coalesce针对调大分区个数一定要产生Shuffle操作吗?为什么?

原因:

  • 调小分区个数,可以让多个分区直接进行合并为一个大分区,可以不需要Shuffle。
  • 调大分区个数,需要将一个分区内的数据打乱分发到多个分区,必须借助于Shuffle。
    Spark基础篇-Spark-Core核心模型

3.分区计算

Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute函数以达到这个目的。

/**
  * :: DeveloperApi ::
  * Implemented by subclasses to compute a given partition.
  */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]

注意,该属性为开发者API,使用Spark编程人员不允许使用。

4.依赖列表

掌握依赖之前先了解下RDD操作。

4.1RDD操作

4.1.1转化算子

返回一个RDD的操作,延迟计算。

常见的转换算子如下:

  1. 基于元素进行操作
1 map(func) 将每个原元素经过func函数转换后返回一个新元素,组成一个由新元素组成一个新的分布式数据集
2 flatMap(func) 类似于 map, 但是每一个输入元素, 会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素)
  1. 基于分区进行操作
1 mapPartitions(func) 类似于map,但在RDD的每个分区(块)上分别运行,因此在T类型的RDD上运行时,func必须是Iterator=>Iterator类型
2 mapPartitionsWithIndex(func) 与mapPartitions类似,但也为func提供了一个表示分区索引的整数值,因此在T类型的RDD上运行时,func必须是(Int,Iterator)=>Iterator类型
  1. 聚合操作
1 reduceByKey([partitioner,]fun[,numPartitions]) 按key进行fun操作
2 foldByKey(defaultValue[,numPartitions/partitioner])(fun) 按key进行fun操作,可以设置默认值
3 combineByKey[A](fun1,fun2,fun3) 按key进行fun操作,fun1为分区内key第一次出现时调用该方法;fun2为分区内key不是第一次出现时调用;fun3为分区间key相同时调用
  1. 分组操作
1 groupByKey([partitioner|numPartitions]) 按key进行分组,可以指定分区个数或指定分区方式
2 groupBy(fun[numPartitions|Partitioner]) 按fun返回值进行分组,可以指定分区个数或指定分区方式
3 groupWith(otherRDD*) 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为 CoGroup
  1. 连接操作
1 join(other) 连接
2 rightOuterJoin(other) 右外连接
3 leftOuterJoin(other) 左外连接
4 fullOuterJoin(other) 全连接
5 cogroup(other) 分组连接
6 subtractByKey(other) 求差
  1. 排序操作
1 sortByKey([boolean]) 按key排序,默认为升序,boolean=false为降序,true为升序
2 sortBy(fun,[boolean]) 按fun的返回值进行排序,默认为升序,boolean=false为降序,true为升序

4.1.2行动算子

返回一个结果或者写到文件系统中的操作,行动算子会触发转化算子进行计算。

常见的行动算子如下:

  1. 获取部分元素
1 first 获取第一个元素(类似于 take(1)
2 max 获取最大元素
3 min 获取最小元素
4 top(num) 按key进行降序排列,获取前num个元素,返回一个数组
5 take(num) 返回一个数组,由数据集的前 n 个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是 Driver 程序所在机器,单机计算所有的元素(Gateway 的内存压力会增大,需要谨慎使用)
6 takeOrdered(n, [ordering]) 使用RDD的自然顺序或自定义比较器返回RDD的前n个元素。
7 takeSample(withReplacement, num, [seed]) 返回一个数组,其中包含数据集num元素的随机样本,可以替换也可以不替换,还可以预先指定随机数生成器种子。
  1. 规约操作
1 reduce(func) 依次将元素根据func执行二元操作
2 fold(defaulteValue)(func) 在reduce基础上添加默认值
3 aggregate[U] (defaulteValue) (func1,func2) 分区内执行func1,分区间执行func2
  1. 输出到外部系统
1 saveAsTextFile(path) 保存为文本文件
2 saveAsObjectFile(path) 保存为对象文件
3 saveAsSequenceFile(path) (Java and Scala) 保存为SequenceFile
4 saveAsHadoopFile[OutputFormat[Key,Value] (path) 保存为HadoopFile
5 foreach(func) 遍历元素执行func
6 foreachPartition(func) 遍历分区执行func
  1. 其他操作
1 reduce(func) 通过函数 func 聚集数据集中的所有元素。Func 函数接受 2 个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
2 collect() 在 Driver 的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其它操作后,返回一个足够小的数据子集再使用, 直接将整个 RDD 集 Collect 返回, 很可能会让 Driver程序 OOM
3 count() 返回数据集的元素个数
4 countByValue 对每个元素分别计数
5 countByKey 对每个键对应的元素分别计数
6 collectAsMap 将结果以映射表的形式返回
7 lookup(key) 返回给定键对应的所有值

注意,每当我们调用一个新的行动操作的时候,整个RDD都会从头开始计算。

4.1.3缓存操作

在执行多个查询操作时,可以将RDD缓存在内存中,后续的其他查询就可以重用RDD,来提升查询速度。

相关方法:

  • cache()
  • persist([持久化级别])
  • unpersist()

持久化级别

Storage Level Meaning
MEMORY_ONLY 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,则某些分区将不会被缓存,并且每次需要时都会动态重新计算。这是默认级别。
MEMORY_AND_DISK 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_ONLY_SER (Java and Scala) 将RDD存储为序列化Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用[fast serializer]时(http://spark.apache.org/docs/latest/tuning.html),但更需要CPU来读取。
MEMORY_AND_DISK_SER (Java and Scala) 类似于只使用内存的分区,但是将内存中不适合的分区溢出到磁盘,而不是在每次需要时动态地重新计算它们。
DISK_ONLY 仅在磁盘上存储RDD分区。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上面的级别相同,但是在两个集群节点上复制每个分区。
OFF_HEAP (experimental) 与内存类似,但将数据存储在[堆外内存](http://spark.apache.org/docs/latest/configuration.html#内存-管理)。这需要启用堆外内存。

注意,缓存操作属于转换算子,因此会延迟计算,直到遇到第一个行动算子才会触发缓存。

4.1.4案例

案例:统计出用户表中男女用户的人数。

数据目录:hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat

数据说明:用户ID::性别::年龄::职业代码::邮编

代码实现:

object MovieTest{
  def main(args:Array[String])={
    //获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
    //1.读取数据集
    val userRDD=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
    //2.将每行字符串按照::进行分隔,并获取分割之后的第二部分,即性别
    val mapRDD=userRDD.map(line=>line.split("::")(1))
    //3.将每个元素转化为二元元组,第一元为是每个元素,第二元为数字1
    val map2RDD=mapRDD.map(gender=>(gender,1))
    //4.将map2RDD中每个元素按照key进行分组求和,即获取男女用户的人数
    val resultRDD=map2RDD.reduceByKey(_+_);
    //val resultRDD=map2RDD.groupByKey.mapValues(value=>value.reduce(_+_));
    //5.将结果输出到控制台
    resultRDD.foreach(println)
    //6.将结果存储到文本文件中
    resultRDD.saveAsTextFile("userNumByGender_result_1")
    
    //关闭资源
    sc.stop()
    
  }
}

思考,如果要在上述代码中添加缓存操作来提高效率,缓存代码应该添加到哪儿?

思考,结合上案例中17、18行代码,请描述reduceByKeygroupByKey的区别?

4.2依赖

抽象RDD采用结构设计模式中的装饰器(包装器)模式设计,如下图根据第一章 初识Spark中的词频统计案例绘制。
Spark基础篇-Spark-Core核心模型

由于RDD转化算子的返回值是一个新的RDD,新RDD和原RDD之间存在一种关系,这种关系就称为依赖。

不同的算子依据其特性,可能会产生不同的依赖。

  • 例如map操作会产生narrow dependency
  • 而join操作一般则产生wide dependency

依赖关系分类:

  • 窄依赖
    • 父RDD一个数据分区只被子RDD的一个数据分区所使用
  • 宽依赖
    • 父RDD一个数据分区被子RDD的多个数据分区所使用

案例演示依赖关系如下:
Spark基础篇-Spark-Core核心模型

思考,借助于依赖关系,可以完成哪些事情?

提示:

  • 任务机制

    • 根据依赖关系获取DAG,从而进行Stage划分,产生TaskSets。
  • 容错机制

    • 在部分分区数据丢失时,可以通过依赖关系重新计算丢失的分区数据,而不需要对所有分区进行重新计算。
    • 在Spark中一般称为:lineage、谱系图、血缘系统。

依赖对象源代码如下:

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

4.2.1窄依赖

父RDD一个数据分区只被子RDD的一个数据分区所使用。

具体子类有:

OneToOneDependency
RangeDependency

窄依赖以及其具体子类源代码如下:

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}
/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

思考,在案例演示依赖关系图中,窄依赖的三个案例中,哪些是 OneToOneDependency?哪些是RangeDependency

答案,案例1、案例3是OneToOneDependency ;案例2是RangeDependency

4.2.2宽依赖

父RDD一个数据分区被子RDD的多个数据分区所使用。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {

  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}

4.3有向无环图

在任务执行机制中,依赖关系称为DAG。

根据以下步骤编写代码,来查看有向无环图 。

  1. 读取一个日志文件
  2. 过滤出来包含error的日志数据
  3. 在第一步的基础上,过滤出来包含 warning的日志数据
  4. 将第二步和第三步的RDD合并
  5. 获取满足条件的数据个数
  6. 获取十条满足条件的数据,并输出到控制台
object Test{
  def main(args:Array[String])={
    //获取SparkConf对象
    val conf=new SparkConf();
    conf.setMaster("local[*]")
    conf.setAppName("演示2-分区个数")
    //获取SparkContext对象
    val sc=SparkContext.getOrCreate(conf)
    
   	val inputRDD=sc.textFile("log.txt")
		val errorRDD=inputRDD.filter(line => line.contains("error"))
		val waraningRDD=inputRDD.filter(line => line.contains("warning"))
		val badLinesRDD=errorRDD.union(warningRDD)

		println("Input had "+badLinesRDD.count+"concerning lines")
		println("Here are 10 examples:")
		badLinesRDD.take(10).foreach(println)	
    
    //关闭资源
    sc.stop()
  }
}

浏览Spark的WEB页面,查看其中的有向无环图。

Spark基础篇-Spark-Core核心模型

思考,结合RDD中采用装饰模式的设计理念,绘制该App中涉及到的RDD转化图形。

5.分区方式

  • Spark程序通过控制RDD分区方式来减少通信开销
  • 根据key控制每个分区内的数据
  • 只有key-value的RDD,才可能会有partitioner
  • 非key-value的RDD的partitioner返回值是None

分区方式源代码如下:

/**
 * An object that defines how the elements in a key-value pair RDD are partitioned by key.
 * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
 *
 * Note that, partitioner must be deterministic, i.e. it must return the same partition id given the same partition key.
 */
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

object Partitioner {
/**
   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
   *
   * If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
   * as the default partitions number, otherwise we'll use the max number of upstream partitions.
   *
   * When available, we choose the partitioner from rdds with maximum number of partitions. If this
   * partitioner is eligible (number of partitions within an order of maximum number of partitions
   * in rdds), or has partition number higher than or equal to default partitions number - we use
   * this partitioner.
   *
   * Otherwise, we'll use a new HashPartitioner with the default partitions number.
   *
   * Unless spark.default.parallelism is set, the number of partitions will be the same as the
   * number of partitions in the largest upstream RDD, as this should be least likely to cause
   * out-of-memory errors.
   *
   * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
   */
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))

    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    // If the existing max partitioner is an eligible one, or its partitions number is larger
    // than or equal to the default number of partitions, use the existing partitioner.
    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }

  /**
   * Returns true if the number of partitions of the RDD is either greater than or is less than and
   * within a single order of magnitude of the max number of upstream partitions, otherwise returns
   * false.
   */
  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }
}

5.1分区对象

5.1.1HashPartitioner

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

5.1.2RangePartitioner

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
 * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
 *
 * @note The actual number of partitions created by the RangePartitioner might not be the same
 * as the `partitions` parameter, in the case where the number of sampled records is less than
 * the value of `partitions`.
 */
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

  // A constructor declared in order to maintain backward compatibility for Java, when we add the
  // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
  // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
    this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  }

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
  require(samplePointsPerPartitionHint > 0,
    s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

  def numPartitions: Int = rangeBounds.length + 1

  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

  override def equals(other: Any): Boolean = other match {
    case r: RangePartitioner[_, _] =>
      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
    case _ =>
      false
  }

  override def hashCode(): Int = {
    val prime = 31
    var result = 1
    var i = 0
    while (i < rangeBounds.length) {
      result = prime * result + rangeBounds(i).hashCode
      i += 1
    }
    result = prime * result + ascending.hashCode
    result
  }

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => out.defaultWriteObject()
      case _ =>
        out.writeBoolean(ascending)
        out.writeObject(ordering)
        out.writeObject(binarySearch)

        val ser = sfactory.newInstance()
        Utils.serializeViaNestedStream(out, ser) { stream =>
          stream.writeObject(scala.reflect.classTag[Array[K]])
          stream.writeObject(rangeBounds)
        }
    }
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => in.defaultReadObject()
      case _ =>
        ascending = in.readBoolean()
        ordering = in.readObject().asInstanceOf[Ordering[K]]
        binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

        val ser = sfactory.newInstance()
        Utils.deserializeViaNestedStream(in, ser) { ds =>
          implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
          rangeBounds = ds.readObject[Array[K]]()
        }
    }
  }
}

private[spark] object RangePartitioner {

  /**
   * Sketches the input RDD via reservoir sampling on each partition.
   *
   * @param rdd the input RDD to sketch
   * @param sampleSizePerPartition max sample size per partition
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

  /**
   * Determines the bounds for range partitioning from candidates with weights indicating how many
   * items each represents. Usually this is 1 over the probability used to sample this candidate.
   *
   * @param candidates unordered candidates with weights
   * @param partitions number of partitions
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    val ordered = candidates.sortBy(_._1)
    val numCandidates = ordered.size
    val sumWeights = ordered.map(_._2.toDouble).sum
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }
}

5.1.3自定义Partitioner

extends Partitioner重写两个抽象方法即可。

案例:


case class Student(name:String,age:Int,gender:String)

class  GenderPartitioner(numPatitions:Int) extends  Partitioner{
  override def numPartitions:Int = numPatitions

  override def getPartition(key: Any): Int = key match {
    case null => 0
    case x:Student => x.gender().hashcode() % numPatitions
    case _  => throw new UnsupportedOperationException("不支持")
  }

  override def hashCode(): Int = numPatitions;

  override def equals(o: scala.Any): Boolean = o match {
    case x:GenderPartitioner => x.numPartitions == numPatitions
    case _ => false
  }

}

5.2分区方式

查看RDD分区方式:rdd.partitioner

思考,如何让rdd有分区方式?

回答:

  • 预定义分区
    • 专门分区方式partitionBy
  • 分区操作
    • 调用带有分区方式的功能方法
      • cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、 groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sortByKey()、(如果父RDD有分区方式的话,filter、mapValues、flatMapValues)等,其他操作生成结果都不会存在特定分区方式。
      • 如果RDD在调用以上操作之前已经具有的分区方式,则以上操作都能够从分区中获益。
      • 此时以上方法就不会进行数据混洗,减少了数据混洗的开销。

演示代码:

import org.apache.spark.{HashPartitioner, RangePartitioner}
import org.apache.spark.rdd.RDD

object PartitionerTest {
  def main(args: Array[String]): Unit = {
    
    val conf=new SparkConf
    conf.setMaster("local[*]")
    conf.setAppName("分区方式练习")
    val sc=SparkContext.getOrCreate(conf)
    
    val seq=Seq(
      					Student("larry",56,"男"),
                Student("renen",50,"女"),
                Student("kevin",46,"男"),
                Student("tarry",36,"男"),
                Student("mark",32,"男")
               )
    val rdd: RDD[Int] =sc.makeRDD(seq,3)
    val pairRDD: RDD[(Int, Int)] =rdd.map(stu=>stu->stu.age)
    
    println(s"分区方式:${pairRDD.partitioner}")
    println(s"分区个数:${pairRDD.getNumPartitions}")

    //如何让rdd有分区方式?
    
    //1.专门分区方式方法 partitionBy
    
    //1.1HashPartitioner
    val partitioner=new HashPartitioner(5)
    //1.2RangePartitioner
    //val partitioner=new RangePartitioner[Int,Int](2,pairRDD)
    //1.3自定义分区
    //val partitioner=new GenderPartitioner(2)
    
    val partitionerRDD=pairRDD.partitionBy(partitioner)
    println(s"分区方式:${partitionerRDD.partitioner}")
    println(s"分区个数:${partitionerRDD.getNumPartitions}")
    
    //2.调用带有分区方式的功能方法
    val nameRDD=rdd.map(stu=>stu.name->stu)
    val groupRDD: RDD[(Int, Iterable[Int])] = nameRDD.groupByKey()
    //val groupRDD: RDD[(Int, Iterable[Int])] =nameRDD.groupByKey(partitioner = new RangePartitioner(6,pairRDD))
    println(s"分区方式:${groupRDD.partitioner}")
    println(s"分区个数:${groupRDD.getNumPartitions}")
    
    val sortRDD: RDD[(Int, Int)] = nameRDD.sortByKey()
    println(s"分区方式:${sortRDD.partitioner}")
    println(s"分区个数:${sortRDD.getNumPartitions}")

    
    //请分析以下两行代码的区别:
    groupRDD.map(elem=>(elem._1,elem._2.age)).sortByKey().foreach(println)
    groupRDD.mapValues(elem=>elem.age).sortByKey().foreach(println)


    //关闭资源
    sc.stop();
  }
}

说明:

  • 对于二元操作,输出数据的分区方式取决于父RDD的分区方式。
  • 默认情况下,结果采用哈希分区,分区的数量和并行度一样。
  • 如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式。
  • 如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。

6.优选位置

存储每个Partition位置的列表(preferred location)

  • 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块位置。

注意,按照“移动数据不如移动计算”原则,Spark在进行任务调度的时候,会尽可能根据数据文件存储的位置信息,将任务分配到数据所在的节点进行计算。

上一篇:让图像识别准确率瞬间下降40个点,「江苏卷」版ImageNet你考得过吗?


下一篇:【梦溪笔谈】6.spark-sql相关代码