1. sparkContext中设置createTaskScheduler
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning(
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
}
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
// TODO: Enumerate the exact reasons why it can fail
// But irrespective of it, it means we cannot proceed !
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
scheduler.initialize(backend) //调用实现类的initialize函数
scheduler
在taskSchedulerImpl.scala中
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
2. submitTasks
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
3. CoarseGrainedSchedulerBackend的reviveOffers
override def reviveOffers() {
driverActor ! ReviveOffers //将msg发给CoarseGrainedSchedulerBackend的driverActor
}
case ReviveOffers =>
makeOffers()
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
/**
2 * Represents free resources available on an executor.
*/
private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)
1 /**
2 * Called by cluster manager to offer resources on slaves. We respond by asking our active task
3 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
4 * that tasks are balanced across the cluster.
5 */
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
}
} // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
21 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
} // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
} if (tasks.size > 0) {
hasLaunchedTask = true
}
58 return tasks
}
4. launchTasks
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
extends SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String]
// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
private[spark] class TaskDescription(
2 val taskId: Long,
3 val executorId: String,
4 val name: String,
5 val index: Int, // Index within this task's TaskSet
6 _serializedTask: ByteBuffer)
7 extends Serializable { // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
private val buffer = new SerializableBuffer(_serializedTask) def serializedTask: ByteBuffer = buffer.value override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}
5. CoarseGrainedSchedulerBackend收到executor的注册之后,记录executor
def receive = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
totalCores(executorId) = cores
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
makeOffers()
}
executor先向CoarseGrainedSchedulerBackend注册,然后CoarseGrainedSchedulerBackend发task(序列化后)到这个executor上去。
6. CoarseGrainedExecutorBackend跟CoarseGrainedSchedulerBackend通信。
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int,
sparkProperties: Seq[(String, String)])
7 extends Actor with ActorLogReceive with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") var executor: Executor = null
var driver: ActorSelection = null override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
17 driver ! RegisterExecutor(executorId, hostPort, cores) //注册
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
} override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false) case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1) case LaunchTask(data) => //收到task
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.name, taskDesc.serializedTask)
}
7. executor.launchTask
def launchTask(
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
且听下回分解