Driver端发送任务
1.TaskSchedulerImpl调用submitTask方法
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
这里主要是为taskSet设置出来了一个TaskManager,TaskManager负责监控TaskSet当中所有的Task并为其分配资源,根据资源的就近原则进行任务的分配
2、SchedulableBuilder有两种实现方式,一种是FIFO,一种是FAIR,调度策略可以自行设置,以此来确定TaskManager当中的Task的执行顺序,Spark默认的版本是FIFO
3、CoarseGrainedSchedulerBackend当中的makeOffers和reviveOffers
// Make fake resource offers on all executors
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
override def reviveOffers(): Unit = {
driverEndpoint.send(ReviveOffers)
}
4、TaskSchedulerImpl当中的resourcesOffers,输入是Executor的列表,输出是TaskDescription,其中包含了TaskID,ExecutorID和Task的执行环境,其中实现了TaskSetManage的资源分配,等待阻塞任务,重新从其他的Executor当中分配任务
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
....................
....................
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
}
val hosts = offers.map(_.host).distinct
for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
}
5、resourcesOffers返回的对象是Task,最终会提交给Executor当中的LaunchTask来实现
Driver端接收任务
1、TaskSchedulerImpl当中的statusUpdate用于接收Executor返回的任务的信息
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer): Unit = {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
Option(taskIdToTaskSetManager.get(tid)) match {
case Some(taskSet) =>
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
...
...
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
2、接收任务每次都是用一个线程池来处理结果
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit = {
getTaskResultExecutor.execute(new Runnable {
...
...
}
...
...
//判断回传的task结果的大小,超过1G就删除(默认)
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
// kill the task so that it will not become zombie task
scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
"Tasks result size has exceeded maxResultSize"))
return
}
//获取从Executor当中传递过来的结果
if (serializedTaskResult.isEmpty) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
// force deserialization of referenced value
deserializedResult.value(taskResultSerializer.get())
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)