深入理解Spark:核心思想与源码分析. 3.10 创建和启动ExecutorAllocationManager

3.10 创建和启动ExecutorAllocationManager

ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动Executor-AllocationManager的代码如下。

private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =

    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {

        Some(new ExecutorAllocationManager(this, listenerBus, conf))

    } else {

        None

    }

executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-47。

代码清单3-47 ExecutorAllocationManager的关键代码

private val intervalMillis: Long = 100

private var clock: Clock = new RealClock

private val listener = new ExecutorAllocationListener

def start(): Unit = {

    listenerBus.addListener(listener)

    startPolling()

}

 

private def startPolling(): Unit = {

    val t = new Thread {

        override def run(): Unit = {

            while (true) {

                try {

                    schedule()

                } catch {

                    case e: Exception => logError("Exception in dynamic executor allocation thread!", e)

                }

                Thread.sleep(intervalMillis)

            }

        }

    }

    t.setName("spark-dynamic-executor-allocation")

    t.setDaemon(true)

    t.start()

}

根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

  listenerBus.start()

上一篇:访谈:Airbnb数据流程框架Airflow与数据工程学的未来


下一篇:DTS数据订阅 | 《DTS控制台入门一本通》第四章