Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){

  webUi.bind()
  context.system.scheduler.schedule( millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除   case CheckForWorkerTimeOut => {
    timeOutDeadWorkers()
  }   /** Check for, and remove, any timed-out workers */  
  def timeOutDeadWorkers() {
    ...
    if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + ) * WORKER_TIMEOUT)) {
      workers -= worker
    }
  } }
Worker.preStart(){

  override def preStart() {
    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
    webUi.bind()
    registerWithMaster() //注册该Worker到Master
  }   def tryRegisterAllMasters() {
    for (masterUrl <- masterUrls) {
      logInfo("Connecting to master " + masterUrl + "...")
      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  } }
Master.scala

case RegisterWorker(){  

  persistenceEngine.addWorker(worker)
  sender ! RegisteredWorker(masterUrl, masterWebUiUrl) //向Worker发送Worker注册成功事件   schedule() //调度部分后续章节分析 }
Worker.scala

case RegisteredWorker(){

  registered = true
  context.system.scheduler.schedule( millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) //Worker注册成功后,定时向Master发送心跳信息 } case SendHeartbeat =>
  masterLock.synchronized {
  if (connected) { master ! Heartbeat(workerId) }
}
Master.scala

case Heartbeat(workerId) => {
  idToWorker.get(workerId) match {
  case Some(workerInfo) =>
    workerInfo.lastHeartbeat = System.currentTimeMillis() //更新该worker的上次发送心跳信息的时间
  case None =>
    logWarning("Got heartbeat from unregistered worker " + workerId)
  }
}

=================如上步骤完成了Worker到Master的连接===============================================

SparkContext启动时:

SparkContext.createTaskScheduler()

  ==>new SparkDeploySchedulerBackend()

    ==>创建AppClient并启动

      ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)}  //向Master发起RegisterApplication事件
Master.scala

case RegisterApplication(description) {

  val app = createApplication(description, sender)

  registerApplication(app)
  persistenceEngine.addApplication(app)
  sender ! RegisteredApplication(app.id, masterUrl) //向Worker发起RegisteredApplication事件表示该Application已经注册成功
  schedule() //调度部分后续章节分析
}

=======================如上步骤完成了Application到Master的连接===============================================

小结:

1、Master的主要功能:

  1)Master Leader选举;

  2)Master对Worker、Application等的管理(接收worker的注册并管理所有的worker,接收client提交的application,(FIFO)调度等待的application并向worker提交);

2、Worker的主要功能:

  1)通过RegisterWorker注册到Master;

  2)定时发送心跳给Master;

3)根据master发送的application配置进程环境,并启动StandaloneExecutorBackend

3、运行spark-shell:

  1)ClientActor通过RegisterApplication注册到Master;

  2)Master收到RegisterApplication后,通过scheduler方法进行调度,如有满足要求的Worker,则发送LaunchExecutor给相应的Worker;

上一篇:javaCV开发详解之5:录制音频(录制麦克风)到本地文件/流媒体服务器(基于javax.sound、javaCV-FFMPEG)


下一篇:javaCV开发详解之8:转封装在rtsp转rtmp流中的应用(无须转码,更低的资源消耗)