1. rdd action ->sparkContext.runJob->dagscheduler.runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
throw exception
}
}
2. sumbitJob
/**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
} val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
} assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
30 eventProcessActor ! JobSubmitted(
31 jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) //向eventProcessActor发送消息,有个疑问:此处rdd怎么变成message?是将元数据(partition等位置信息)序列化吗?
waiter
}
3. DAGSchedulerEventProcessActor
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
extends Actor with Logging { override def preStart() {
// set DAGScheduler for taskScheduler to ensure eventProcessActor is always
// valid when the messages arrive
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
} /**
* The main event loop of the DAG scheduler.
*/
def receive = {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties) case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
4. actor调用 handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
listener: JobListener,
properties: Properties = null)
{
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job) //如果只有一个parition,而且没有parent,并运行本地运行,则单独起一个线程执行
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
properties))
submitStage(finalStage)
}
}
submitWaitingStages()
}
/**
2 * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
3 * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
4 * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
5 * directly.
*/
private def newStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: Option[ShuffleDependency[_,_]],
jobId: Int,
callSite: Option[String] = None)
: Stage =
{
val id = nextStageId.getAndIncrement()
val stage =
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = StageInfo.fromStage(stage)
stage
}
/**
2 * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
3 * We run the operation in a separate thread just in case it takes a bunch of time, so that we
4 * don't block the DAGScheduler event loop or other concurrent jobs.
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
}.start()
}
5. submitStage: 如果parent stage有缺失,
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
runningStages += stage
} else {
14 for (parent <- missing) {
15 submitStage(parent)
16 }
17 waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id)
}
}
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {//如果cacheLocs包含Nil,则认为此rdd miss了
for (dep <- rdd.dependencies) {
dep match { //分两种情况:shufDep和narrowDep,前者会生成shuffleMapStage,后者会递归访问
10 case shufDep: ShuffleDependency[_,_] =>
11 val mapStage = getShuffleMapStage(shufDep, stage.jobId)
12 if (!mapStage.isAvailable) {
13 missing += mapStage
}
15 case narrowDep: NarrowDependency[_] =>
16 visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
6. submitMissTasks
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
var tasks = ArrayBuffer[Task[_]]()
if (stage.isShuffleMap) {
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //将stage中存储空间outputLocas为Nil的patition生成一个shuffleMapTask
10 val locs = getPreferredLocs(stage.rdd, p)
11 tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
// This is a final stage; figure out its job's missing partitions
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
19 tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) //生成resultTask
}
} val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
} // must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) if (tasks.size > 0) {
35 // Preemptively serialize a task to make sure it can be serialized. We are catching this
36 // exception here because it would be fairly hard to catch the non-serializable exception
37 // down the road, where we have several different implementations for local scheduler and
38 // cluster schedulers.
try {
SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
} logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
51 taskScheduler.submitTasks(
52 new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) //将这些task生成一个taskSet,并调用taskScheduler.submitTasks
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
runningStages -= stage
}
}
7. taskSet: 某个rdd的一部分parition missing了,会通过上面的步骤找到,并将这些partition生成对应的tasks,通过taskSet来一起调度。
1 /**
2 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
3 * missing partitions of a particular stage.
4 */
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val attempt: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + attempt def kill(interruptThread: Boolean) {
tasks.foreach(_.kill(interruptThread))
} override def toString: String = "TaskSet " + id
}
8. taskScheduler.submitTasks
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
backend.reviveOffers()
}