1.任务提交分析
这里以org.apache.spark.examples.SparkPi为例。当执行reduce(_+_)方法时,其底层调用了sc.runJob方法。核心代码如下:
/**
* 注释:(rdd, func, partitions, callSite, resultHandler, properties)
* 1、应用程序调用 action 算子
* 2、sparkContext.runJob()
* 3、dagScheduler.runJob()
* 4、TaskScheduler.submitTasks(new TaskSet())
* 5、SchedulerBackEnd.driverEndpoint 提交任务
*/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
其中runJob方法中执行的核心代码:
/**
* TODO 注释: 提交任务
* 参数解析:
* 1、rdd:要在其上运行任务的参数RDD目标RDD
* 2、func:在RDD的每个分区上运行的函数
* 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
* 4、callSite:在用户程序中调用此作业的位置
* 5、resultHandler:回调函数,以将每个分区结果传递给Xxx
* 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
*
* rdd1.xx1().xx2().xx3().xx4() 这里的 rdd = rdd1.xx1().xx2().xx3()
*
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
其内部核心代码为:
/**
* TODO 注释:
* 第一步:封装一个JobWaiter对象;
* 第二步:将 JobWaiter 对象赋值给 JobSubmitted 的 listener 属性,
* 并将 JobSubmitted(DAGSchedulerEvent事件)对象传递给 eventProcessLoop 事件循环处理器。
* eventProcessLoop 内部事件消息处理线程将会接收 JobSubmitted 事件,
* 并调用dagScheduler.handleJobSubmitted(...) 方法来处理事件;
* 第三步:返回 JobWaiter 对象。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
/**
* TODO 注释:这是提交任务运行
* eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
* 这个组件主要负责:任务的提交执行
* 把 JobSubmitted 这个消息,放入了 eventQueue 队列中
*/
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
// TODO 注释: 返回结果对象的引用
waiter
通过eventProcessLoop.post(...)将任务放入了eventQueue队列中,有通过eventProcessLoop.start()方法将任务提交。前面有见到以下代码是等待任务的提交,正好有对应上:
/**
* TODO 注释: driver 中,初始化了一个 dagSchedudelr
* 它里面又初始化了一个 eventThread 专门用来处理 JobSubmitted
*/
// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
// TODO 注释:获取消息
// TODO 注释:一定要注意:当 sparkContext 还没有初始化好的时候,是不执行 sc.runJob 提交任务的。
// TODO 注释:当执行 sc.runJob(sc) 的时候,就会提交 Job 到这儿来。
val event = eventQueue.take()
try {
/**
* TODO 注释:根据事件的类型,调用不同的 handleXXX 方法来进行处理。
* 当接收到任务提交的时候: event = JobSubmitted
*/
onReceive(event)
} catch {
case NonFatal(e) => try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
在这里通过onReceive进行任务的提交,任务提交给了DAGScheduler
2.Stage切分与提交
任务提交核心代码如下,包含了2个方面,Stage切分与Task分发与执行。在DAGScheduler.handleJobSubmitted(...)中有如下代码:
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
这个finalRDD就是rdd链条中的最后一个RDD,也就是触发sc.runJob()方法执行的RDD。必然是针对某个RDD调用了一个action算子才触发执行的,则该RDD就是finalRDD。
注意以下概念:
- ShuffleMapStage + ResultStage
- ShuffleMapTask + ResultTask
- ShuffleDependency + NarrowDependency
在createResultStage(...)方法中,做了以下事情:
/**
* TODO Create a ResultStage associated with the provided jobId.
* 进行Stage切分的详细方法实现
*/
private def createResultStage(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
// TODO 注释:获得父stage,若没有shuffle则返回空List
// TODO 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
// 所有的父类stage都已经构建完成并返回给parents。
val parents = getOrCreateParentStages(rdd, jobId)
// TODO 注释: finalStage=resultStage 的 stageID 这里返回的是最后一个stage的Id
val id = nextStageId.getAndIncrement()
// TODO 注释:创建当前最后的ResultStage
// TODO 注释:parents 所有的 父 stage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
// TODO 注释:将 ResultStage 与 stageId 相关联, 保存在 map 中
// TODO 注释:stageIdToStage = new HashMap[Int, Stage]
stageIdToStage(id) = stage
// TODO 注释:更新该job中包含的stage
updateJobIdStageIdMaps(jobId, stage)
// TODO 注释:返回ResultStage
stage
}