上一篇已经将 Application 注册到了 master 上了,在 master 收到注册消息后会进行一系列操作,最后调用 schedule 方法。
这个 schedule 方法会去做两件事,一件事是给等待调度的 driver 分配资源,另一件事是给等待调度的 application 去分配资源启动 Executor。
给 application 分配资源启动 Executor 的代码最终会调用一个方法:launchExecutor(是 Master 中的代码)。
在 lauchExecutor 方法中会先向 worker 发送 lauchExecutor 消息,然后会向 driver 发送 executor 已经启动的消息。
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 向 worker 发送 LaunchExecutor 消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向 driver 发送 ExecutorAdded 的消息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
下面看 worker 端收到 launchExecutor 消息后是怎么处理的。
同样的在 receive 的模式匹配中找到该消息的匹配,可以看到做了这些事情:
1,先判断发消息的 master 是否是 alive 状态,如果是才会继续执行;
2,创建 executor 的工作目录和本地临时目录;
3,将 master 发来的消息封装为 ExecutorRunner 对象,然后调用其 start 方法启动线程;
4,向 master 发送消息,告诉当前 executor 的状态;
// 模式匹配,是 LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
// 如果发送消息的 master 不是 active 的则不执行
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// 创建 executor 的工作目录
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// 通过 SPARK_EXECUTOR_DIRS 环境变量配置创建 Executor 的本地目录
// Worker 会在当前 application 执行结束后删除这个目录
val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
// 将接收到的 application 中 Executor 的相关信息封装为一个 ExecutorRunner 对象
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
// 启动这个线程
manager.start()
// 更新 worker 的资源利用情况
coresUsed += cores_
memoryUsed += memory_
// 给 master 回复消息
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
这里主要看 ExecutorRunner 调用 start 之后做了什么。
实际上是创建了一个线程,线程运行时会去执行 fetchAndRunExecutor 这个方法。
private[worker] def start() {
// 创建线程
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
// 启动线程
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}
fetchAndRunExecutor 这个方法将接收到的信息做如下一些事情:
1,创建 ProcessBuilder,准备执行本地命令;
2,为 ProcessBuilder 创建执行目录,设置环境变量;
3,启动 ProcessBuilder,生成 Executor 进程,这个进程的名称一般为:CoarseGrainedExecutorBackend;
4,重定向输出流和错误文件流;
5,等待获取 executor 的退出码,然后发送给 worker;
private def fetchAndRunExecutor() {
try {
// 创建 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
// 封装指令
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
// 启动进程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// 重定向标准输出流
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
// 重定向错误输出流
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
// 等待退出码
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// 将退出码发送给 worker
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
至此,Executor 是启动完成了。