Spark中的checkpoint的简单介绍

为什么要用checkpoint呢?

checkpoint的意思就是建立检查点,类似于快照,比如,在spark计算中,计算流程DAG很长,要是将整个DAG计算完成并得出结果,需要很长时间,在这等待时间中突然中间数据丢失,spark就会根据RDD的依赖关系从头到尾开始计算一遍,这样会很费性能的,怎么解决呢?这就需要用到缓存了,我们可以将中间的计算结过通过cache或者persist方式放到内存中,这样也不一定保证数据不会丢失,如果存储的内存除了问题,也是会导致spark重新根据RDD计算的,所以就有了checkpint。
其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点,将结果存储到一个高可用的地方(通常呢这个地方就是hdfs里面)。

示例:sc.textFile(“hdfspath”).flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile(“hdfspath”)
1.在textFile读取hdfs的时候就会先创建一个HadoopRDD,其中这个RDD是去读取hdfs的数据key为偏移量value为一行数据,因为通常来讲偏移量没有太大的作用所以然后会将HadoopRDD转化为MapPartitionsRDD,这个RDD只保留了hdfs的数据

2.flatMap 产生一个RDD MapPartitionsRDD

3.map 产生一个RDD MapPartitionsRDD

4.reduceByKey 产生一个RDD ShuffledRDD

5.saveAsTextFile 产生一个RDD MapPartitionsRDD

可以在测试环境中直接查看RDD的依赖:rdd.toDebugString方法或者去webUI

scala> val rdd = sc.textFile("hdfs://192.168.1.101:9000/checkpoint0610/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[29] at reduceByKey at <console>:27
scala> rdd.toDebugString
res3: String = 
(2) ShuffledRDD[29] at reduceByKey at <console>:27 []
 +-(2) MapPartitionsRDD[28] at map at <console>:27 []
    |  MapPartitionsRDD[27] at flatMap at <console>:27 []
    |  hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 MapPartitionsRDD[26] at textFile at <console>:27 []
    |  hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 HadoopRDD[25] at textFile at <console>:27 []

val sc: SparkContext = SparkContext.getOrCreate(conf)
sc.setCheckpointDir("file:///F:/checkpointFile")  //设置检查点保存的文件名
val rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(2,"b")))
rdd.checkpoint()    //设置检查点
rdd.foreach(println)
println(rdd.count())
println(rdd.isCheckpointed)  //判断是否设置检查点
println(rdd.getCheckpointFile)  //获取检查点所在文件目录

(2,b)
(1,a)
2
true
Some(file:/F:/checkpointFile/87962c20-cd22-52ee-b01c-2a1d09090c3b/rdd-0)

上一篇:【krpano】二维码自动生成插件(源码+介绍+预览)


下一篇:Spark之DAG