Master.preStart(){ webUi.bind()
context.system.scheduler.schedule( millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除 case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
} /** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
...
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + ) * WORKER_TIMEOUT)) {
workers -= worker
}
} }
Worker.preStart(){ override def preStart() {
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.bind()
registerWithMaster() //注册该Worker到Master
} def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
} }
Master.scala case RegisterWorker(){ persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl) //向Worker发送Worker注册成功事件 schedule() //调度部分后续章节分析 }
Worker.scala case RegisteredWorker(){ registered = true
context.system.scheduler.schedule( millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) //Worker注册成功后,定时向Master发送心跳信息 } case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
Master.scala case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis() //更新该worker的上次发送心跳信息的时间
case None =>
logWarning("Got heartbeat from unregistered worker " + workerId)
}
}
=================如上步骤完成了Worker到Master的连接===============================================
SparkContext启动时:
SparkContext.createTaskScheduler() ==>new SparkDeploySchedulerBackend() ==>创建AppClient并启动 ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)} //向Master发起RegisterApplication事件
Master.scala case RegisterApplication(description) { val app = createApplication(description, sender) registerApplication(app)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl) //向Worker发起RegisteredApplication事件表示该Application已经注册成功
schedule() //调度部分后续章节分析
}
=======================如上步骤完成了Application到Master的连接===============================================
小结:
1、Master的主要功能:
1)Master Leader选举;
2)Master对Worker、Application等的管理(接收worker的注册并管理所有的worker,接收client提交的application,(FIFO)调度等待的application并向worker提交);
2、Worker的主要功能:
1)通过RegisterWorker注册到Master;
2)定时发送心跳给Master;
3)根据master发送的application配置进程环境,并启动StandaloneExecutorBackend
3、运行spark-shell:
1)ClientActor通过RegisterApplication注册到Master;
2)Master收到RegisterApplication后,通过scheduler方法进行调度,如有满足要求的Worker,则发送LaunchExecutor给相应的Worker;