Spark2.x精通:Checkpoint源码深度剖析

1.checkpoint功能是什么呢?


   对于一个复杂的应用程序,可能中间会涉及到多个RDD的转换操作,计算过程中可能由于某些原因,导致中间关键的计算数据丢失,如果出现上述情况,而集群没有容错机制的话,后面的操作恰恰需要前面的RDD数据时,就需要重新计算一次,这时候就可以启用checkpoint机制,来实现集群的容错和高可用。


2.设置checkpoint目录

    

    启用checkpoint,首先要调用SparkContext的setCheckpointDir()方法,设置一个可靠的文件系统目录(一般是HDFS),我们先看下这个函数实现,代码如下:

   //这传进来路径,这里一般是传hdfs路径  def setCheckpointDir(directory: String) {    //如果我们以集群模式运行任务,而传进来一个本地路径,driver就会在本地构建本地路径,而executor是运行在集群    //各个节点上,这样j就不对了,跑在一个集群上的任务,这个目录必须是一个HDFS路径。    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +        s"must not be on the local filesystem. Directory '$directory' " +        "appears to be on the local filesystem.")    }    //下面就是在Hdfs上构建一个checkpoint目录    checkpointDir = Option(directory).map { dir =>      val path = new Path(dir, UUID.randomUUID().toString)      val fs = path.getFileSystem(hadoopConfiguration)      fs.mkdirs(path)      fs.getFileStatus(path).getPath.toString    }  }


3.对RDD启用checkpoint机制


    设置checkpoint目录后,可对RDD调用checkpoint()函数,对其启用checkpoint机制,看下代码:

  /**   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint   * directory set with `SparkContext#setCheckpointDir` and all references to its parent   * RDDs will be removed. This function must be called before any job has been   * executed on this RDD. It is strongly recommended that this RDD is persisted in   * memory, otherwise saving it on a file will require recomputation.   */   //上面几行英文大体意思是:   // 启用checkpoint之后,这个RDD数据以文件形式保存在你设置的文件系统中   // 另外一个就是这个RDD之前的父RDD的数据都会被删除掉,这里要注意  def checkpoint(): Unit = RDDCheckpointData.synchronized {    //首先检查是否设置了checkpoint目录    if (context.checkpointDir.isEmpty) {      throw new SparkException("Checkpoint directory has not been set in the SparkContext")    } else if (checkpointData.isEmpty) {      //这里实例化化一个ReliableRDDCheckpointData类      //这里要注意 只是实例化 而且是懒加载的  并没有真正写数据      checkpointData = Some(new ReliableRDDCheckpointData(this))    }  }

    类ReliableRDDCheckpointData继承自RDDCheckpointData类,这个父类有一个变量标识这checkpoint的状态,初始化状态为Initialized,代码如下:

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])  extends Serializable {
 import CheckpointState._
  // 初始状态为Initialized   //它一共有三个状态,依次是Initialized->CheckpointingInProgress->Checkpointed  protected var cpState = Initialized


4.checkpoint的触发执行


    这个RDD什么时候才执行数据的写入呢,只有在action操作提交job时候,才会调用doCheckpoint()函数,先看runJob()代码如下:

  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      resultHandler: (Int, U) => Unit): Unit = {    if (stopped.get()) {      throw new IllegalStateException("SparkContext has been shutdown")    }    val callSite = getCallSite    val cleanedFunc = clean(func)    logInfo("Starting job: " + callSite.shortForm)    if (conf.getBoolean("spark.logLineage", false)) {      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)    }    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)    progressBar.foreach(_.finishAll())    //这里才调用rdd的checkpoint(),从最后一个rdd,依次向前找需要进行checkpoint的RDD    //然后才会将其写入设置的checkpoint目录    rdd.doCheckpoint()  }

    继续看RDD的doCheckpoint()函数,代码如下:

//这里会从job的最后一个RDD开始,依次x向前递归调用父RDD,然后对启用了checkpoint的RDD//将其数据写入指定的checkpoint目录private[spark] def doCheckpoint(): Unit = {    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {      if (!doCheckpointCalled) {        doCheckpointCalled = true        if (checkpointData.isDefined) {          if (checkpointAllMarkedAncestors) {            // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint            // them in parallel.            // Checkpoint parents first because our lineage will be truncated after we            // checkpoint ourselves            dependencies.foreach(_.rdd.doCheckpoint())          }          //这里才是真正的数据写入          checkpointData.get.checkpoint()        } else {          dependencies.foreach(_.rdd.doCheckpoint())        }      }    }  }

    上面是调用的类RDDCheckpointData中的checkpoint()函数,代码如下:

final def checkpoint(): Unit = {    // Guard against multiple threads checkpointing the same RDD by    // atomically flipping the state of this RDDCheckpointData    RDDCheckpointData.synchronized {      //更新状态  从Initialized到CheckpointingInProgress      if (cpState == Initialized) {        cpState = CheckpointingInProgress      } else {        return      }    }   //这里调用ReliableRDDCheckpointData,真正的将数据写入指定文件系统目录    val newRDD = doCheckpoint()
    // 更新Checkpoint状态从CheckpointingInProgress到Checkpointed,    //最后清空RDD依赖    RDDCheckpointData.synchronized {      cpRDD = Some(newRDD)      cpState = Checkpointed      rdd.markCheckpointed()    }  }

 

5.继续跟踪ReliableRDDCheckpointData类的doCheckpoint()函数,代码如下:

protected override def doCheckpoint(): CheckpointRDD[T] = {    //这里就是写入文件文件系统的操作了。里面代码很简单 自己去看下    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    // 用于控制是否在对RDD引用超出范围时删除其对应的checkout文件    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {      rdd.context.cleaner.foreach { cleaner =>        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)      }    }    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")    newRDD  }


6.上篇文章再讲CacheManager的时候讲了什么时候会去读checkpoint中的数据,这里不再讲了。

    

    至此,checkpoint相关的内容讲解完毕,我们整个Spark源码剖析部分基本也结束了,感谢关注!!!


上一篇:Flink checkpoint失败


下一篇:Flink的容错机制,状态一致性基本概念