Spark-2.4.0源码:sparkContext

  在看sparkContext之前,先回顾一下Scala的语法。Scala构造函数分主构造和辅构造函数,辅构造函数是关键字def+this定义的,而类中不在方法体也不在辅构造函数中的代码就是主构造函数,实例化对象的时候主构造函数都会被执行,例:

  

class person(name String,age Int){
    println("主构造函数被调用")

    def this(name String,age Int){ //辅构造函数
        this ()    //必须先调用主构造函数
        this.name = name
        this.age = age
    }
    
    def introduce(){
        println("name :" + name + "-age :" + age)
    }
}

val jack = new person("jack",2)

jack.introduce()

  运行结果:

  主构造函数被调用

  name  :jack-age :2

 

  切入正题,看sparkContext的主构造函数比较重要的一些代码:

try{
        ...
        // Create the Spark execution environment (cache, map output tracker, etc)
        _env = createSparkEnv(_conf, isLocal, listenerBus)
        SparkEnv.set(_env)
        
        ...
        
        // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
        // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(
        HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

        
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    }
    

  首先:

   _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

   这里是在sparkContext中创建rpcEnv,并通过 setupEndpoint 向 rpcEnv 注册一个心跳的 Endpoint。

  接着:

 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

  调的sparkContext自己的方法,创建taskScheduler,返回的是一个 (SchedulerBackend, TaskScheduler) 元组

private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
        //...
        
        //standalone的提交模式
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        //调用初始化方法
        scheduler.initialize(backend)
        (backend, scheduler)
    }
    
        //...
    }

  方法内部根据master参数判断不同的提交模式,创建不同的(SchedulerBackend, TaskScheduler) ,拿standalon模式举例,根据入参创建TaskSchedulerImpl和StandalonSchedulerBackend,再调用TaskSchedulerImpl的初始化方法,最后返回一个元组。

   scheduler.initialize(backend),其实就是根据不同的schedulingMode创建不同的schedulableBuilder,它就是对Scheduleable tree的封装,负责对taskSet的调度。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

  接着下面两行代码:

  _dagScheduler = new DAGScheduler(this)

  创建DAG有向无环图,实现类面向stage的调度机制的高层次调度层,他会为每个stage计算DAG(有向无环图),追踪RDD和stage的输出是否被物化(写入磁盘或内存),并且寻找一个最少消耗的调度机制来运行job。它会将stage作为taskSets提交到底层的TaskSchedulerImpl上来在集群运行。除了处理stage的DAG,它还负责决定运行每个task的最佳位置,基于当前的缓存状态,将最佳位置提交给底层的TaskSchedulerImpl,此外,他会处理由于每个shuffle输出文件导致的失败,在这种情况下旧的stage可能会被重新提交。一个stage内部的失败,如果不是由于shuffle文件丢失导致的失败,会被taskScheduler处理,它会多次重试每个task,还不行才会取消整个stage。

  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

   在上面创建好了TaskScheduler和SchedulerBackend后,告诉 HeartbeatReceiver(心跳) 的监听端。

   最后:

  _taskScheduler.start()

   在TaskSchedulerImpl的start()方法中调的是SchedulerBackend的start()方法,所以start()方法运行的是这段:

override def start() {
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    //参数设置
    
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

  这里创建了两个对象:AppliactionDescription和AppClient,AppliactionDescription顾名思义就是对Application的描述类,比如它需要的资源等;AppClient负责负责为application与spark集群通信。SchedulerBackend的start()最终调用了AppClient的start(),代码如下:

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

   启动一个rpcEndPoint并回调给监听器,RPC原理可看这篇 https://www.cnblogs.com/superhedantou/p/7570692.html

  

   最后画个大概流程图Spark-2.4.0源码:sparkContext

 

上一篇:Haproxy 反向代理


下一篇:php – 自动后端发生器