第二章 Spark-Core核心模型
1.RDD
弹性分布式数据集(Resilient Distributed Dataset)是Spark中最基本的数据抽象。
-
不可变(只读)
-
可分区
-
可并行计算
-
自动容错
-
位置感知性调度
RDD是Spark的核心抽象模型,本质上是一个抽象类。RDD源代码部分重点代码实现如下:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
......
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
......
}
RDD有五个属性,用来描述数据集的状态。
- partitions 数据分区(抽象)
- compute 对分区计算的函数(抽象)
- dependences 依赖列表(抽象)
- partitioner 分区方式(具体)
- preferredLocations 优选位置(具体)
注意,RDD的具体实现类中必须重写前三个属性。
思考1,RDD的具体实现类中后两个属性需要重写吗?
RDD的具体实现类如下:
注意,虽然RDD的实现类很多,但只需要掌握抽象RDD中的五个重要属性即可。
2.数据分区
站在数据的角度思考RDD,RDD是由数据分区(partition)组成,这些分区运行在集群中的不同节点上。
- 一个RDD可以包含多个分区
- 一个分区就是一个dataset片段
- 一个分区会被封装成一个Task
RDD内部数据组成如图:
数据分区源码实现如下:
/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
思考2,数据分区内部存储数据吗?RDD存储真正的数据吗?
数据分区内部并不会存储具体的数据。
- Partition类内包含一个index成员,表示该分区在 RDD内的编号;
- 通过RDD编号+分区编号可以唯一确定该分区对应的块编号;
- 利用底层数据存储层提供的接口;
- 就可以从存储介质(如:HDFS、Memory)中提取出分区对应的数据。
2.1构建RDD
2.1.1读取外部数据集
- 文本文件
sc.textFile(path[,minPartitions])
sc.wholeTextFiles(path[,minPartitions])
- 字节文件
binaryFiles(path[,minPartitions])
- 对象文件
sc.objectFile[T](path)
- SequenceFile
sc.sequenceFile(path,keyClass,valueClass[,minPartitions])
sc.sequenceFile[K,V](path[,minPartitions])
- Hadoop输入输出格式
sc.newAPIHadoopFile[Text,Text,KeyValueTextInputFormat
](path)sc.newAPIHadoopFile[F](path)
说明:
-
path可以是文件也可以是目录,也可以是带有匹配符号的路径。
-
minPartitions是指用户未给定时HadoopRDD的默认最小分区数。注意,我们使用math.min所以"defaultMinPartitions"不能大于2。
-
keyClass、valueClass是指数据文件中key、value的数据类型。
在源码中各个方法定义如下:
//1.文本文件
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
//....
}
//2.字节文件
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
//...
}
//3.对象文件
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
assertNotStopped()
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}
//4.sequence文件
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
}
//5.newAPIHadoop文件
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]])
}
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
//...
}
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
2.1.2 对一个Seq并行化
sc.makeRDD(seq[,numPartition])
sc.parallelize(seq[,numPartition])
说明:
- seq是指Seq集合
- numPartition是指分区个数(并行度)
在源码中makeRDD
、parallelize
方法定义如下:
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
2.1.3案例
- 分别使用
textFile
和wholeTextFiles
方法读取/opt/spark/README.md
文件。
object RDDTest1{
def main(args:Array[String])={
//1.获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("案例1")
//2.获取SparkContext对象
val sc=new SparkContext(conf);
sc.setLogLevel("warn")
//3.构建RDD
//3.1使用textFile读取文本文件
val rdd1=sc.textFile("/opt/spark/README.md")
println(rdd1.count)
println(rdd1.first)
//3.2使用wholeTextFiles读取文本文件
val rdd2=sc.wholeTextFiles("/opt/spark/README.md")
println(rdd2.count)
println(rdd2.first)
//5.关闭SparkContext对象
sc.stop()
}
}
观察,rdd1和rdd2调用相同方法输出有什么不同?
- 通过对
Seq
集合并行化构建RDD
object RDDTest2{
def main(args:Array[String])={
//1.获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("案例2")
//2.获取SparkContext对象
val sc=new SparkContext(conf);
sc.setLogLevel("warn")
//3.构建RDD
//使用makeRDD将seq集合并行化
val seq=1 to 10
val rdd1=sc.makeRDD(seq)
//4.1调用map方法将rdd1中每个元素+1,返回一个新的RDD
val rdd2=rdd1.map(x=>x+1)
//4.2将rdd2中的元素输出到控制台
rdd2.foreach(println)
//5.关闭SparkContext对象
sc.stop()
}
}
注意,通过以上两个案例的编写,发现每次都需要构建SparkContext对象,且代码基本一致。
思考3,是否可以封装SparkContext对象的构建过程?如果可以的话,如何实现?
借助于Scala里面的包对象实现封装:
package com
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark使用工具类
* 方便构建使用Spark
* */
package object briup {
private var _conf:Option[SparkConf]=None;
private var _sc:Option[SparkContext]=None;
private var _spark:Option[SparkSession]=None;
private var _ssc:Option[StreamingContext]=None;
implicit val jarFilePath:Option[String]=None;
/**
*
* 获取SparkConf对象
**/
private def getConf(master:String,appName:String,checkPoint:String="spark-checkpoint"):SparkConf={
_conf match{
case Some(conf) => conf
case None =>
val conf=new SparkConf()
conf.setMaster(master)
conf.setAppName(appName)
conf.set("spark.sql.streaming.checkpointLocation",checkPoint)
_conf=Some(conf)
conf
}
}
/**
* 获取SparkContext对象
* */
def getSparkContext(master:String,appName:String)(implicit jarFilePath:Option[String]=None):SparkContext={
_sc match{
case Some(sc) => sc
case None =>
val conf=getConf(master,appName)
//第一种构建方式
// val sc=new SparkContext(conf);
//第二种构建方式
val sc=SparkContext.getOrCreate(conf);
jarFilePath match {
case Some(filepath) => sc.addJar(filepath)
case None =>
}
_sc=Some(sc)
sc.setLogLevel("warn")
sc
}
}
/**
* 获取SparkSession对象
* */
def getSpark(master:String,appName:String,checkPoint:String="spark-checkpoint")(implicit jarFilePath:Option[String]):SparkSession={
_spark match{
case Some(spark) =>
// println("...获取已经存在的Spark...")
spark
case None =>
// println("...开始创建Spark...")
val conf=getConf(master,appName)
val spark=SparkSession.builder().config(conf).getOrCreate();
jarFilePath match {
case Some(filepath) => spark.sparkContext.addJar(filepath)
case None => //println("无jarFilePath......");
}
_spark=Some(spark)
spark
}
}
/**
* 获取StreamingContext对象
* */
def getStreamingSpark(master:String,appName:String,batchDur:Duration)(implicit jarFilePath:Option[String]=None):StreamingContext={
_ssc match{
case Some(ssc) =>ssc
case None =>
val conf=getConf(master,appName)
val ssc=new StreamingContext(conf,batchDur)
jarFilePath match {
case Some(filepath) => ssc.sparkContext.addJar(filepath)
case None => //println("无jarFilePath......");
}
_ssc=Some(ssc)
ssc
}
}
}
- 操作
非文本
数据文件
数据目录:hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat
数据说明:用户ID::性别::年龄::职业代码::邮编
编码实现:
object RDDTest3{
def main(args:Array[String])={
//1.获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("案例3-各种数据练习")
//2.获取SparkContext对象
val sc=SparkContext.getOrCreate(conf)
//3.获取RDD+4.RDD操作
//1.读取文本文件构建RDD
val rdd=sc.textFile("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
//2.输出到控制台
rdd.foreach(println)
//3.保存为对象文件 注意,该参数为路径名称
rdd.saveAsObjectFile("users_obj")
//4.读取对象文件
val objectRDD=sc.objectFile[String]("users_obj")
//5.输出到控制台
objectRDD.foreach(println)
//6.读取字节文件
val binaryRDD=sc.binaryFiles("hdfs://172.16.0.4:9000/data/grouplens/ml-1m/users.dat")
//8.输出到控制台
binaryRDD.foreach(println)
//9.保存为Sequence文件
objectRDD.map(str=>(str.length,str)).saveAsSequenceFile("users_seq")
//10.读取Sequence文件
val seqRDD=sc.sequenceFile[Int,String]("users_seq")
//11.输出到控制台
seqRDD.foreach(println)
//12.保存为Hadoop格式的文件
seqRDD.map(tu=>(new IntWritable(tu._1),new Text(tu._2))).saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable,Text]]("users_hadoop")
//13.读取Hadoop格式的文件
val hadoopRDD=sc.newAPIHadoopFile[IntWritable,Text,SequenceFileInputFormat[IntWritable,Text]]("users_hadoop")
//14.输出到控制台
hadoopRDD.foreach(println)
//5.关闭SparkContext对象
sc.stop()
}
}
思考4,学会了如何构建RDD,如何查看RDD中的分区个数?
2.2分区个数
获取分区个数:rdd对象.getNumPartitions
演示:
object RDDTest2{
def main(args:Array[String])={
//1.获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("演示1-分区个数")
//2.获取SparkContext对象
val sc=SparkContext.getOrCreate(conf)
//3.构建RDD
val rdd=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
//3.1查看RDD的分区个数
println(rdd.getNumPartitions)
//5.关闭SparkContext对象
sc.stop()
}
}
注意,分区个数会决定Stage中Task的个数,分区个数是Spark任务调度中的并行度。
思考5,如何设置RDD的分区个数?
- 获取RDD时指定
- 读取外部文件时,可选参数minPartitions
- 不指定时为math.min(Spark任务调度中的并行度,2)
- minPartitions是指用户未给定时HadoopRDD的默认最小分区数
- 并行Seq集合时,可选参数numPartition
- 不指定时为Spark任务调度中的并行度
- numPartition是指分区数
- 读取外部文件时,可选参数minPartitions
- 重分区(调整RDD的分区个数)
rdd.repartition(numPartitions:Int)
rdd.coalesce(numPartitions:Int,shuffle:Boolean)
pairRdd.repartitionAndSortWithinPartitions(partitioner: Partitioner)
重分区的方法源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
思考6,重分区一定需要通过网络混洗吗?
注意:
- 对于
repartition
和coalesace
方法,Spark建议使用repartition
的优化版coalesace
- 如果重分区之后需要对分区内的数据进行排序,Spark建议使用
repartitionAndSortWithinPartitions
演示:
object RDDTest2{
def main(args:Array[String])={
//1.获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("演示2-分区个数")
//2.获取SparkContext对象
val sc=SparkContext.getOrCreate(conf)
//3.1构建RDD时指定分区个数
val rdd1=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
println(rdd1.getNumPartitions)
val rdd2=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat",5)
println(rdd2.getNumPartitions)
val rdd3=sc.makeRDD(1 to 10)
println(rdd3.getNumPartitions)
val rdd3=sc.makeRDD(1 to 10,4)
println(rdd3.getNumPartitions)
//3.2重分区调整分区个数
println(s"重分区 repartition 前:分区个数:${rdd1.getNumPartitions}")
val re_rdd1=rdd1.repartition(3)
println(s"重分区 repartition 后:分区个数:${re_rdd1.getNumPartitions}")
println("-----调小分区个数-------")
println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
val co_rdd2=rdd2.coalesce(3)
println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
println("-----调大分区个数-------")
println(s"重分区 coalesce 前:分区个数:${rdd2.getNumPartitions}")
val co_rdd2=rdd2.coalesce(10)
println(s"重分区 coalesce 后:分区个数:${co_rdd2.getNumPartitions}")
println(s"重分区 repartitionAndSortWithinPartitions 前:分区个数:${rdd3.getNumPartitions}")
val partitioner=new HashPartitioner(5)
val ras_rdd3=rdd3.map(x=>(x,1)).repartitionAndSortWithinPartitions(partitioner)
println(s"重分区 repartitionAndSortWithinPartitions 后:分区个数:${ras_rdd3.getNumPartitions}")
//5.关闭SparkContext对象
sc.stop()
}
}
思考7,coalesce
方法中第二个参数的含义?coalesce
优于repartition
的原因?
原因:
-
repartition
操作一定会产生Shuffle操作,网络开销大,性能降低。 -
coalesce
针对调小分区个数可以不用产生Shuffle操作,故可以节省网络开销,提高效率。
思考8,coalesce
针对调大分区个数一定要产生Shuffle操作吗?为什么?
原因:
- 调小分区个数,可以让多个分区直接进行合并为一个大分区,可以不需要Shuffle。
- 调大分区个数,需要将一个分区内的数据打乱分发到多个分区,必须借助于Shuffle。
3.分区计算
Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute函数以达到这个目的。
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
注意,该属性为开发者API,使用Spark编程人员不允许使用。
4.依赖列表
掌握依赖之前先了解下RDD操作。
4.1RDD操作
4.1.1转化算子
返回一个RDD的操作,延迟计算。
常见的转换算子如下:
- 基于元素进行操作
1 | map(func) | 将每个原元素经过func函数转换后返回一个新元素,组成一个由新元素组成一个新的分布式数据集 |
---|---|---|
2 | flatMap(func) | 类似于 map, 但是每一个输入元素, 会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素) |
- 基于分区进行操作
1 | mapPartitions(func) | 类似于map,但在RDD的每个分区(块)上分别运行,因此在T类型的RDD上运行时,func必须是Iterator=>Iterator类型 |
---|---|---|
2 | mapPartitionsWithIndex(func) | 与mapPartitions类似,但也为func提供了一个表示分区索引的整数值,因此在T类型的RDD上运行时,func必须是(Int,Iterator)=>Iterator类型 |
- 聚合操作
1 | reduceByKey([partitioner,]fun[,numPartitions]) | 按key进行fun操作 |
---|---|---|
2 | foldByKey(defaultValue[,numPartitions/partitioner])(fun) | 按key进行fun操作,可以设置默认值 |
3 | combineByKey[A](fun1,fun2,fun3) | 按key进行fun操作,fun1为分区内key第一次出现时调用该方法;fun2为分区内key不是第一次出现时调用;fun3为分区间key相同时调用 |
- 分组操作
1 | groupByKey([partitioner|numPartitions]) | 按key进行分组,可以指定分区个数或指定分区方式 |
---|---|---|
2 | groupBy(fun[numPartitions|Partitioner]) | 按fun返回值进行分组,可以指定分区个数或指定分区方式 |
3 | groupWith(otherRDD*) | 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为 CoGroup |
- 连接操作
1 | join(other) | 连接 |
---|---|---|
2 | rightOuterJoin(other) | 右外连接 |
3 | leftOuterJoin(other) | 左外连接 |
4 | fullOuterJoin(other) | 全连接 |
5 | cogroup(other) | 分组连接 |
6 | subtractByKey(other) | 求差 |
- 排序操作
1 | sortByKey([boolean]) | 按key排序,默认为升序,boolean=false为降序,true为升序 |
---|---|---|
2 | sortBy(fun,[boolean]) | 按fun的返回值进行排序,默认为升序,boolean=false为降序,true为升序 |
4.1.2行动算子
返回一个结果或者写到文件系统中的操作,行动算子会触发转化算子进行计算。
常见的行动算子如下:
- 获取部分元素
1 | first | 获取第一个元素(类似于 take(1) |
---|---|---|
2 | max | 获取最大元素 |
3 | min | 获取最小元素 |
4 | top(num) | 按key进行降序排列,获取前num个元素,返回一个数组 |
5 | take(num) | 返回一个数组,由数据集的前 n 个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是 Driver 程序所在机器,单机计算所有的元素(Gateway 的内存压力会增大,需要谨慎使用) |
6 | takeOrdered(n, [ordering]) | 使用RDD的自然顺序或自定义比较器返回RDD的前n个元素。 |
7 | takeSample(withReplacement, num, [seed]) | 返回一个数组,其中包含数据集num元素的随机样本,可以替换也可以不替换,还可以预先指定随机数生成器种子。 |
- 规约操作
1 | reduce(func) | 依次将元素根据func执行二元操作 |
---|---|---|
2 | fold(defaulteValue)(func) | 在reduce基础上添加默认值 |
3 | aggregate[U] (defaulteValue) (func1,func2) | 分区内执行func1,分区间执行func2 |
- 输出到外部系统
1 | saveAsTextFile(path) | 保存为文本文件 |
---|---|---|
2 | saveAsObjectFile(path) | 保存为对象文件 |
3 | saveAsSequenceFile(path) (Java and Scala) | 保存为SequenceFile |
4 | saveAsHadoopFile[OutputFormat[Key,Value] (path) | 保存为HadoopFile |
5 | foreach(func) | 遍历元素执行func |
6 | foreachPartition(func) | 遍历分区执行func |
- 其他操作
1 | reduce(func) | 通过函数 func 聚集数据集中的所有元素。Func 函数接受 2 个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 |
---|---|---|
2 | collect() | 在 Driver 的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其它操作后,返回一个足够小的数据子集再使用, 直接将整个 RDD 集 Collect 返回, 很可能会让 Driver程序 OOM |
3 | count() | 返回数据集的元素个数 |
4 | countByValue | 对每个元素分别计数 |
5 | countByKey | 对每个键对应的元素分别计数 |
6 | collectAsMap | 将结果以映射表的形式返回 |
7 | lookup(key) | 返回给定键对应的所有值 |
注意,每当我们调用一个新的行动操作的时候,整个RDD都会从头开始计算。
4.1.3缓存操作
在执行多个查询操作时,可以将RDD缓存在内存中,后续的其他查询就可以重用RDD,来提升查询速度。
相关方法:
- cache()
- persist([持久化级别])
- unpersist()
持久化级别:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,则某些分区将不会被缓存,并且每次需要时都会动态重新计算。这是默认级别。 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不适合内存,请将不适合的分区存储在磁盘上,并在需要时从那里读取它们。 |
MEMORY_ONLY_SER (Java and Scala) | 将RDD存储为序列化Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用[fast serializer]时(http://spark.apache.org/docs/latest/tuning.html),但更需要CPU来读取。 |
MEMORY_AND_DISK_SER (Java and Scala) | 类似于只使用内存的分区,但是将内存中不适合的分区溢出到磁盘,而不是在每次需要时动态地重新计算它们。 |
DISK_ONLY | 仅在磁盘上存储RDD分区。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上面的级别相同,但是在两个集群节点上复制每个分区。 |
OFF_HEAP (experimental) | 与内存类似,但将数据存储在[堆外内存](http://spark.apache.org/docs/latest/configuration.html#内存-管理)。这需要启用堆外内存。 |
注意,缓存操作属于转换算子,因此会延迟计算,直到遇到第一个行动算子才会触发缓存。
4.1.4案例
案例:统计出用户表中男女用户的人数。
数据目录:hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat
数据说明:用户ID::性别::年龄::职业代码::邮编
代码实现:
object MovieTest{
def main(args:Array[String])={
//获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("演示2-分区个数")
//获取SparkContext对象
val sc=SparkContext.getOrCreate(conf)
//1.读取数据集
val userRDD=sc.textFile("hdfs://172.16.0.4/data/grouplens/ml-1m/users.dat")
//2.将每行字符串按照::进行分隔,并获取分割之后的第二部分,即性别
val mapRDD=userRDD.map(line=>line.split("::")(1))
//3.将每个元素转化为二元元组,第一元为是每个元素,第二元为数字1
val map2RDD=mapRDD.map(gender=>(gender,1))
//4.将map2RDD中每个元素按照key进行分组求和,即获取男女用户的人数
val resultRDD=map2RDD.reduceByKey(_+_);
//val resultRDD=map2RDD.groupByKey.mapValues(value=>value.reduce(_+_));
//5.将结果输出到控制台
resultRDD.foreach(println)
//6.将结果存储到文本文件中
resultRDD.saveAsTextFile("userNumByGender_result_1")
//关闭资源
sc.stop()
}
}
思考,如果要在上述代码中添加缓存操作来提高效率,缓存代码应该添加到哪儿?
思考,结合上案例中17、18行代码,请描述reduceByKey
与groupByKey
的区别?
4.2依赖
抽象RDD采用结构设计模式中的装饰器(包装器)模式设计,如下图根据第一章 初识Spark中的词频统计案例绘制。
由于RDD转化算子的返回值是一个新的RDD,新RDD和原RDD之间存在一种关系,这种关系就称为依赖。
不同的算子依据其特性,可能会产生不同的依赖。
- 例如map操作会产生narrow dependency
- 而join操作一般则产生wide dependency
依赖关系分类:
- 窄依赖
- 父RDD一个数据分区只被子RDD的一个数据分区所使用
- 宽依赖
- 父RDD一个数据分区被子RDD的多个数据分区所使用
案例演示依赖关系如下:
思考,借助于依赖关系,可以完成哪些事情?
提示:
-
任务机制
- 根据依赖关系获取DAG,从而进行Stage划分,产生TaskSets。
-
容错机制
- 在部分分区数据丢失时,可以通过依赖关系重新计算丢失的分区数据,而不需要对所有分区进行重新计算。
- 在Spark中一般称为:lineage、谱系图、血缘系统。
依赖对象源代码如下:
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
4.2.1窄依赖
父RDD一个数据分区只被子RDD的一个数据分区所使用。
具体子类有:
OneToOneDependency
RangeDependency
窄依赖以及其具体子类源代码如下:
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
思考,在案例演示依赖关系图中,窄依赖的三个案例中,哪些是 OneToOneDependency
?哪些是RangeDependency
?
答案,案例1、案例3是OneToOneDependency
;案例2是RangeDependency
。
4.2.2宽依赖
父RDD一个数据分区被子RDD的多个数据分区所使用。
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
* @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
4.3有向无环图
在任务执行机制中,依赖关系称为DAG。
根据以下步骤编写代码,来查看有向无环图 。
- 读取一个日志文件
- 过滤出来包含error的日志数据
- 在第一步的基础上,过滤出来包含 warning的日志数据
- 将第二步和第三步的RDD合并
- 获取满足条件的数据个数
- 获取十条满足条件的数据,并输出到控制台
object Test{
def main(args:Array[String])={
//获取SparkConf对象
val conf=new SparkConf();
conf.setMaster("local[*]")
conf.setAppName("演示2-分区个数")
//获取SparkContext对象
val sc=SparkContext.getOrCreate(conf)
val inputRDD=sc.textFile("log.txt")
val errorRDD=inputRDD.filter(line => line.contains("error"))
val waraningRDD=inputRDD.filter(line => line.contains("warning"))
val badLinesRDD=errorRDD.union(warningRDD)
println("Input had "+badLinesRDD.count+"concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)
//关闭资源
sc.stop()
}
}
浏览Spark的WEB页面,查看其中的有向无环图。
思考,结合RDD中采用装饰模式的设计理念,绘制该App中涉及到的RDD转化图形。
5.分区方式
- Spark程序通过控制RDD分区方式来减少通信开销
- 根据key控制每个分区内的数据
- 只有key-value的RDD,才可能会有partitioner
- 非key-value的RDD的
partitioner
返回值是None
分区方式源代码如下:
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*
* Note that, partitioner must be deterministic, i.e. it must return the same partition id given the same partition key.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
*
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
* partitioner is eligible (number of partitions within an order of maximum number of partitions
* in rdds), or has partition number higher than or equal to default partitions number - we use
* this partitioner.
*
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
*
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
* number of partitions in the largest upstream RDD, as this should be least likely to cause
* out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
// If the existing max partitioner is an eligible one, or its partitions number is larger
// than or equal to the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
}
/**
* Returns true if the number of partitions of the RDD is either greater than or is less than and
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
* false.
*/
private def isEligiblePartitioner(
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
}
5.1分区对象
5.1.1HashPartitioner
/**
* A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
* Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
5.1.2RangePartitioner
/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* @note The actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {
// A constructor declared in order to maintain backward compatibility for Java, when we add the
// 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
// This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
}
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
override def equals(other: Any): Boolean = other match {
case r: RangePartitioner[_, _] =>
r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
case _ =>
false
}
override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result + ascending.hashCode
result
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeBoolean(ascending)
out.writeObject(ordering)
out.writeObject(binarySearch)
val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser) { stream =>
stream.writeObject(scala.reflect.classTag[Array[K]])
stream.writeObject(rangeBounds)
}
}
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
ascending = in.readBoolean()
ordering = in.readObject().asInstanceOf[Ordering[K]]
binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]
val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
}
private[spark] object RangePartitioner {
/**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
/**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
}
5.1.3自定义Partitioner
extends Partitioner
重写两个抽象方法即可。
案例:
case class Student(name:String,age:Int,gender:String)
class GenderPartitioner(numPatitions:Int) extends Partitioner{
override def numPartitions:Int = numPatitions
override def getPartition(key: Any): Int = key match {
case null => 0
case x:Student => x.gender().hashcode() % numPatitions
case _ => throw new UnsupportedOperationException("不支持")
}
override def hashCode(): Int = numPatitions;
override def equals(o: scala.Any): Boolean = o match {
case x:GenderPartitioner => x.numPartitions == numPatitions
case _ => false
}
}
5.2分区方式
查看RDD分区方式:rdd.partitioner
思考,如何让rdd有分区方式?
回答:
- 预定义分区
- 专门分区方式
partitionBy
- 专门分区方式
- 分区操作
- 调用带有
分区方式
的功能方法- cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、 groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sortByKey()、(如果父RDD有分区方式的话,filter、mapValues、flatMapValues)等,其他操作生成结果都不会存在特定分区方式。
- 如果RDD在调用以上操作之前已经具有的分区方式,则以上操作都能够从分区中获益。
- 此时以上方法就不会进行数据混洗,减少了数据混洗的开销。
- 调用带有
演示代码:
import org.apache.spark.{HashPartitioner, RangePartitioner}
import org.apache.spark.rdd.RDD
object PartitionerTest {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local[*]")
conf.setAppName("分区方式练习")
val sc=SparkContext.getOrCreate(conf)
val seq=Seq(
Student("larry",56,"男"),
Student("renen",50,"女"),
Student("kevin",46,"男"),
Student("tarry",36,"男"),
Student("mark",32,"男")
)
val rdd: RDD[Int] =sc.makeRDD(seq,3)
val pairRDD: RDD[(Int, Int)] =rdd.map(stu=>stu->stu.age)
println(s"分区方式:${pairRDD.partitioner}")
println(s"分区个数:${pairRDD.getNumPartitions}")
//如何让rdd有分区方式?
//1.专门分区方式方法 partitionBy
//1.1HashPartitioner
val partitioner=new HashPartitioner(5)
//1.2RangePartitioner
//val partitioner=new RangePartitioner[Int,Int](2,pairRDD)
//1.3自定义分区
//val partitioner=new GenderPartitioner(2)
val partitionerRDD=pairRDD.partitionBy(partitioner)
println(s"分区方式:${partitionerRDD.partitioner}")
println(s"分区个数:${partitionerRDD.getNumPartitions}")
//2.调用带有分区方式的功能方法
val nameRDD=rdd.map(stu=>stu.name->stu)
val groupRDD: RDD[(Int, Iterable[Int])] = nameRDD.groupByKey()
//val groupRDD: RDD[(Int, Iterable[Int])] =nameRDD.groupByKey(partitioner = new RangePartitioner(6,pairRDD))
println(s"分区方式:${groupRDD.partitioner}")
println(s"分区个数:${groupRDD.getNumPartitions}")
val sortRDD: RDD[(Int, Int)] = nameRDD.sortByKey()
println(s"分区方式:${sortRDD.partitioner}")
println(s"分区个数:${sortRDD.getNumPartitions}")
//请分析以下两行代码的区别:
groupRDD.map(elem=>(elem._1,elem._2.age)).sortByKey().foreach(println)
groupRDD.mapValues(elem=>elem.age).sortByKey().foreach(println)
//关闭资源
sc.stop();
}
}
说明:
- 对于二元操作,输出数据的分区方式取决于父RDD的分区方式。
- 默认情况下,结果采用哈希分区,分区的数量和并行度一样。
- 如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式。
- 如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。
6.优选位置
存储每个Partition位置的列表(preferred location)
- 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块位置。
注意,按照“移动数据不如移动计算”原则,Spark在进行任务调度的时候,会尽可能根据数据文件存储的位置信息,将任务分配到数据所在的节点进行计算。