Spark Runtime里的主要层次分析,梳理Runtime组件和运行流程,
DAGScheduler
Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
面向stage的切分,切分依据为宽依赖
维护waiting jobs和active jobs。维护waiting stages、active stages和failed stages,以及与jobs的映射关系
主要职能
- 接收提交Job的主入口。
submitJob(rdd, ...)
或runJob(rdd, ...)
。在SparkContext
里会调用这两个方法。- 生成一个Stage并提交,接着推断Stage是否有父Stage未完毕,若有。提交并等待父Stage。以此类推。结果是:DAGScheduler里添加了一些waiting stage和一个running stage。
- running stage提交后。分析stage里Task的类型,生成一个Task描写叙述,即TaskSet。
- 调用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描写叙述提交给TaskScheduler。TaskScheduler依据资源量和触发分配条件,会为这个TaskSet分配资源并触发运行。 -
DAGScheduler
提交job后。异步返回JobWaiter
对象。能够返回job运行状态,能够cancel job,运行成功后会处理并返回结果
- 处理
TaskCompletionEvent
- 假设task运行成功,相应的stage里减去这个task。做一些计数工作:
- 假设task是ResultTask,计数器
Accumulator
加一。在job里为该task置true,job finish总数加一。加完后假设finish数目与partition数目相等。说明这个stage完毕了,标记stage完毕。从running stages里减去这个stage,做一些stage移除的清理工作
- 假设task是ShuffleMapTask。计数器
Accumulator
加一,在stage里加上一个output location。里面是一个MapStatus
类。MapStatus
是ShuffleMapTask
运行完毕的返回,包含location信息和block size(能够选择压缩或未压缩)。同一时候检查该stage完毕,向MapOutputTracker
注冊本stage里的shuffleId和location信息。然后检查stage的output location里是否存在空。若存在空。说明一些task失败了,整个stage又一次提交;否则,继续从waiting stages里提交下一个须要做的stage
- 假设task是ResultTask,计数器
- 假设task是重提交,相应的stage里添加这个task
- 假设task是fetch失败,立即标记相应的stage完毕。从running stages里减去。
假设不同意retry。abort整个stage;否则,又一次提交整个stage。
另外,把这个fetch相关的location和map任务信息。从stage里剔除,从
MapOutputTracker
注销掉。最后,假设这次fetch的blockManagerId对象不为空,做一次ExecutorLost
处理,下次shuffle会换在还有一个executor上去运行。 - 其它task状态会由
TaskScheduler
处理,如Exception, TaskResultLost, commitDenied等。
- 假设task运行成功,相应的stage里减去这个task。做一些计数工作:
- 其它与job相关的操作还包含:cancel job, cancel stage, resubmit failed stage等
其它职能
1. cacheLocations 和 preferLocation
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
TaskScheduler
维护task和executor相应关系,executor和物理资源相应关系。在排队的task和正在跑的task。
内部维护一个任务队列。依据FIFO或Fair策略,调度任务。
TaskScheduler
本身是个接口,spark里仅仅实现了一个TaskSchedulerImpl
。理论上任务调度能够定制。以下是TaskScheduler
的主要接口:
def start(): Unit
def postStartHook() { }
def stop(): Unit
def submitTasks(taskSet: TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean)
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
主要职能
-
submitTasks(taskSet)
,接收DAGScheduler
提交来的tasks- 为tasks创建一个
TaskSetManager
,加入到任务队列里。TaskSetManager
跟踪每一个task的运行状况,维护了task的很多详细信息。 - 触发一次资源的索要。
- 首先。
TaskScheduler
对比手头的可用资源和Task队列。进行executor分配(考虑优先级、本地化等策略),符合条件的executor会被分配给TaskSetManager
。 - 然后。得到的Task描写叙述交给
SchedulerBackend
。调用launchTask(tasks)
。触发executor上task的运行。task描写叙述被序列化后发给executor,executor提取task信息。调用task的
run()
方法运行计算。
- 首先。
- 为tasks创建一个
-
cancelTasks(stageId)
,取消一个stage的tasks- 调用
SchedulerBackend
的killTask(taskId, executorId, ...)
方法。taskId和executorId在
TaskScheduler
里一直维护着。
- 调用
-
resourceOffer(offers: Seq[Workers])
,这是很重要的一个方法,调用者是SchedulerBacnend
,用途是底层资源SchedulerBackend
把空余的workers资源交给TaskScheduler
。让其依据调度策略为排队的任务分配合理的cpu和内存资源。然后把任务描写叙述列表传回给SchedulerBackend
- 从worker offers里。搜集executor和host的相应关系、active executors、机架信息等等
- worker offers资源列表进行随机洗牌,任务队列里的任务列表依据调度策略进行一次排序
- 遍历每一个taskSet,依照进程本地化、worker本地化、机器本地化、机架本地化的优先级顺序,为每一个taskSet提供可用的cpu核数,看是否满足
- 默认一个task须要一个cpu。设置參数为
"spark.task.cpus=1"
- 为taskSet分配资源,校验是否满足的逻辑,终于在
TaskSetManager
的resourceOffer(execId, host, maxLocality)
方法里 - 满足的话,会生成终于的任务描写叙述。而且调用
DAGScheduler
的taskStarted(task, info)
方法。通知DAGScheduler
,这时候每次会触发DAGScheduler
做一次submitMissingStage
的尝试,即stage的tasks都分配到了资源的话,立即会被提交运行
- 默认一个task须要一个cpu。设置參数为
-
statusUpdate(taskId, taskState, data)
,还有一个很重要的方法,调用者是SchedulerBacnend
,用途是SchedulerBacnend
会将task运行的状态汇报给TaskScheduler
做一些决定- 若
TaskLost
,找到该task相应的executor。从active executor里移除。避免这个executor被分配到其它task继续失败下去。 - task finish包含四种状态:finished, killed, failed, lost。仅仅有finished是成功运行完毕了。
其它三种是失败。
- task成功运行完,调用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,否则调用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
。TaskResultGetter
内部维护了一个线程池,负责异步fetch task运行结果并反序列化。默认开四个线程做这件事,可配參数"spark.resultGetter.threads"=4
。
- 若
TaskResultGetter取task result的逻辑
- 对于success task。假设taskResult里的数据是直接结果数据。直接把data反序列出来得到结果。假设不是。会调用
blockManager.getRemoteBytes(blockId)
从远程获取。假设远程取回的数据是空的,那么会调用
TaskScheduler.handleFailedTask
,告诉它这个任务是完毕了的可是数据是丢失的。否则。取到数据之后会通知
BlockManagerMaster
移除这个block信息,调用TaskScheduler.handleSuccessfulTask
。告诉它这个任务是运行成功的。而且把result data传回去。 - 对于failed task。从data里解析出fail的理由,调用
TaskScheduler.handleFailedTask
。告诉它这个任务失败了,理由是什么。
SchedulerBackend
在TaskScheduler
下层。用于对接不同的资源管理系统,SchedulerBackend
是个接口。须要实现的主要方法例如以下:
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手头上的可用资源交给TaskScheduler。TaskScheduler依据调度策略分配给排队的任务吗,返回一批可运行的任务描写叙述,SchedulerBackend负责launchTask,即终于把task塞到了executor模型上,executor里的线程池会运行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
粗粒度:进程常驻的模式。典型代表是standalone模式,mesos粗粒度模式,yarn
细粒度:mesos细粒度模式
这里讨论粗粒度模式,更好理解:CoarseGrainedSchedulerBackend
。
维护executor相关信息(包含executor的地址、通信端口、host、总核数。剩余核数),手头上executor有多少被注冊使用了。有多少剩余,总共还有多少核是空的等等。
主要职能
- Driver端主要通过actor监听和处理以下这些事件:
-
RegisterExecutor(executorId, hostPort, cores, logUrls)
。这是executor加入的来源,通常worker拉起、重新启动会触发executor的注冊。CoarseGrainedSchedulerBackend
把这些executor维护起来,更新内部的资源信息。比方总核数添加。最后调用一次makeOffer()
,即把手头资源丢给TaskScheduler
去分配一次。返回任务描写叙述回来。把任务launch起来。这个
makeOffer()
的调用会出如今不论什么与资源变化相关的事件中,以下会看到。 -
StatusUpdate(executorId, taskId, state, data)
。task的状态回调。首先,调用TaskScheduler.statusUpdate
上报上去。然后。推断这个task是否运行结束了。结束了的话把executor上的freeCore加回去,调用一次makeOffer()
。 -
ReviveOffers
。这个事件就是别人直接向
SchedulerBackend
请求资源,直接调用makeOffer()
。 -
KillTask(taskId, executorId, interruptThread)
。这个killTask的事件。会被发送给executor的actor,executor会处理KillTask
这个事件。 -
StopExecutors
。通知每一个executor,处理StopExecutor
事件。 -
RemoveExecutor(executorId, reason)
。从维护信息中,那这堆executor涉及的资源数减掉。然后调用TaskScheduler.executorLost()
方法,通知上层我这边有一批资源不能用了,你处理下吧。TaskScheduler
会继续把executorLost
的事件上报给DAGScheduler
,原因是DAGScheduler
关心shuffle任务的output location。DAGScheduler
会告诉BlockManager
这个executor不可用了,移走它,然后把全部的stage的shuffleOutput信息都遍历一遍,移走这个executor,而且把更新后的shuffleOutput信息注冊到MapOutputTracker
上,最后清理下本地的CachedLocations
Map。
-
-
reviveOffers()
方法的实现。直接调用了
makeOffers()
方法,得到一批可运行的任务描写叙述。调用launchTasks
。 -
launchTasks(tasks: Seq[Seq[TaskDescription]])
方法。- 遍历每一个task描写叙述。序列化成二进制。然后发送给每一个相应的executor这个任务信息
- 假设这个二进制信息太大,超过了9.2M(默认的akkaFrameSize 10M 减去 默认 为akka留空的200K)。会出错,abort整个taskSet。并打印提醒增大akka frame size
- 假设二进制数据大小可接受,发送给executor的actor。处理
LaunchTask(serializedTask)
事件。
- 遍历每一个task描写叙述。序列化成二进制。然后发送给每一个相应的executor这个任务信息
Executor
Executor是spark里的进程模型。能够套用到不同的资源管理系统上。与SchedulerBackend
配合使用。
内部有个线程池,有个running tasks map,有个actor,接收上面提到的由SchedulerBackend
发来的事件。
事件处理
-
launchTask
。依据task描写叙述。生成一个
TaskRunner
线程,丢尽running tasks map里。用线程池运行这个TaskRunner
-
killTask
。从running tasks map里拿出线程对象,调它的kill方法。
全文完 :)