本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.12节Spark环境更新,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.12 Spark环境更新
在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。
postEnvironmentUpdate()
postApplicationStart()
SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入httpFileServer的fileDir变量指定的路径下。见代码清单3-49。
代码清单3-49 依赖文件处理
val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
val files: Seq[String] =
conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
httpFileServer的addFile和addJar方法,见代码清单3-50。
代码清单3-50 HttpFileServer提供对依赖文件的访问
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
if (file.isDirectory) {
throw new IllegalArgumentException(s"$file cannot be a directory.")
}
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}
postEnvironmentUpdate的实现见代码清单3-51,其处理步骤如下:
1)通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-52。
2)生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被Environ-mentListener监听,最终影响EnvironmentPage页面中的输出内容。
代码清单3-51 postEnvironmentUpdate的实现
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environmentDetails =
SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
}
代码清单3-52 environmentDetails的实现
val jvmInformation = Seq(
("Java Version", s"$javaVersion ($javaVendor)"),
("Java Home", javaHome),
("Scala Version", versionString)
).sorted
val schedulerMode =
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
Seq[(String, String)]()
}
val sparkProperties = (conf.getAll ++ schedulerMode).sorted
// System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq
val otherProperties = systemProperties.filter { case (k, _) =>
k != "java.class.path" && !k.startsWith("spark.")
}.sorted
// Class paths including all added jars and files
val classPathEntries = javaClassPath
.split(File.pathSeparator)
.filterNot(_.isEmpty)
.map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
Map[String, Seq[(String, String)]](
"JVM Information" -> jvmInformation,
"Spark Properties" -> sparkProperties,
"System Properties" -> otherProperties,
"Classpath Entries" -> classPaths)
}
postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))