1.checkpoint功能是什么呢?
对于一个复杂的应用程序,可能中间会涉及到多个RDD的转换操作,计算过程中可能由于某些原因,导致中间关键的计算数据丢失,如果出现上述情况,而集群没有容错机制的话,后面的操作恰恰需要前面的RDD数据时,就需要重新计算一次,这时候就可以启用checkpoint机制,来实现集群的容错和高可用。
2.设置checkpoint目录
启用checkpoint,首先要调用SparkContext的setCheckpointDir()方法,设置一个可靠的文件系统目录(一般是HDFS),我们先看下这个函数实现,代码如下:
//这传进来路径,这里一般是传hdfs路径
def setCheckpointDir(directory: String) {
//如果我们以集群模式运行任务,而传进来一个本地路径,driver就会在本地构建本地路径,而executor是运行在集群
//各个节点上,这样j就不对了,跑在一个集群上的任务,这个目录必须是一个HDFS路径。
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}
//下面就是在Hdfs上构建一个checkpoint目录
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath.toString
}
}
3.对RDD启用checkpoint机制
设置checkpoint目录后,可对RDD调用checkpoint()函数,对其启用checkpoint机制,看下代码:
/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ //上面几行英文大体意思是: // 启用checkpoint之后,这个RDD数据以文件形式保存在你设置的文件系统中 // 另外一个就是这个RDD之前的父RDD的数据都会被删除掉,这里要注意 def checkpoint(): Unit = RDDCheckpointData.synchronized { //首先检查是否设置了checkpoint目录 if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { //这里实例化化一个ReliableRDDCheckpointData类 //这里要注意 只是实例化 而且是懒加载的 并没有真正写数据 checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
类ReliableRDDCheckpointData继承自RDDCheckpointData类,这个父类有一个变量标识这checkpoint的状态,初始化状态为Initialized,代码如下:
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
import CheckpointState._
// 初始状态为Initialized
//它一共有三个状态,依次是Initialized->CheckpointingInProgress->Checkpointed
protected var cpState = Initialized
4.checkpoint的触发执行
这个RDD什么时候才执行数据的写入呢,只有在action操作提交job时候,才会调用doCheckpoint()函数,先看runJob()代码如下:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) //这里才调用rdd的checkpoint(),从最后一个rdd,依次向前找需要进行checkpoint的RDD //然后才会将其写入设置的checkpoint目录 rdd.doCheckpoint() }
继续看RDD的doCheckpoint()函数,代码如下:
//这里会从job的最后一个RDD开始,依次x向前递归调用父RDD,然后对启用了checkpoint的RDD//将其数据写入指定的checkpoint目录private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { if (checkpointAllMarkedAncestors) { // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint // them in parallel. // Checkpoint parents first because our lineage will be truncated after we // checkpoint ourselves dependencies.foreach(_.rdd.doCheckpoint()) } //这里才是真正的数据写入 checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } } }
上面是调用的类RDDCheckpointData中的checkpoint()函数,代码如下:
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
//更新状态 从Initialized到CheckpointingInProgress
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
//这里调用ReliableRDDCheckpointData,真正的将数据写入指定文件系统目录
val newRDD = doCheckpoint()
// 更新Checkpoint状态从CheckpointingInProgress到Checkpointed,
//最后清空RDD依赖
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
5.继续跟踪ReliableRDDCheckpointData类的doCheckpoint()函数,代码如下:
protected override def doCheckpoint(): CheckpointRDD[T] = {
//这里就是写入文件文件系统的操作了。里面代码很简单 自己去看下
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// 用于控制是否在对RDD引用超出范围时删除其对应的checkout文件
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
6.上篇文章再讲CacheManager的时候讲了什么时候会去读checkpoint中的数据,这里不再讲了。
至此,checkpoint相关的内容讲解完毕,我们整个Spark源码剖析部分基本也结束了,感谢关注!!!