开发者学堂课程【大数据实时计算框架 Spark 快速入门: SparkPi 代码剖析1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1676
SparkPi 代码剖析1
内容介绍:
一、SparkPi 代码剖析
二、递归的方法来倒着往前切割 stage
一、SparkPi 代码剖析
RDD.scala
910
def foreach()
sc.runJob
SparkContext.scala
1928
def runRob ()
1818
1832
dagScheduler.runJob
Dagscheduler.scala
603
611
valwaiter = submitJob()
583
eventProcessLoop.post(Jobsubmitted(
184
private[scheduler] val eventprocessLoop- new DAGSchedulerEventProcessloop(this)
1605
private def doonReceive (event: DAGSchedulerEvent): Unit = event match {
1607
dagscheduler.handleJobSubmitted(
837
首先创建了一个finalstage
最后一个stage是Resultstage
861
submitstage(finalstage)
912
二、递归的方法来倒着往前切割 stage
917
val missing = getMissingParentstages (stage).sortBy(_.id)
433
private def getMissingParentstages
这个方法里面通过 visit 方法和 stack 数据结构
往前去找 fina1RDD 的依赖
477
如果是宽依赖
就生成一个 ShuffleMapstage
如果是窄依赖,是不是就是继续压栈
这个地方最后会返回一个 List ,从 set 转成 List 可以排序
921
submitMissingTasks(stage,jobId.get)
来判断父 stage 里面是否为空,如果为空了就开始从前往后的提交 task
935
private def submitMissingTasks (stage: Stage, jobId: Int)
941
val partitionsTocompute: Seq[Int] = stage. findMissingPartitions()
这里里面放的就是一个 stage 里面的 partition ids
966
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try
{这里里面干的事情就是要把 partitionid 对应的 preferLocations 计算出来有的RDD 有 preferLocations 警如 HadoopRDD
969
partitionsTocompute.map {id => (id, getPreferredLocs(
1529
private def getPreferredLocsInternal(
这里方法里面就是去计算最近位置
1540
val cached = getcachelocs(rdd)(partition)
首先会看 RDD 里面的 partition 有没有 cache 过,有的话 cache 的地址就是最近开始如果没有缓存
1545
val rddPrefs- rdd, preferredlocations (rdd. partitions (partition). tolist
就是根据 partition 所在的 RDD 来获取最佳位置