Spark 持久化机制

Spark速度非常快的原因之一,就是在不同操作中在内存中持久化(或缓存)一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存或磁盘中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。这使得后续的动作变得更加迅速(通常快10倍)。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

其中cache是将中间结果缓存到内存中,而checkpoint是将运算结果缓存到指定的文件目录,一般为HDFS。

1.缓存。

cache底层调用persist算子。

 

/** Persist this RDD  with  the   default   storage level ('MEMORY_ONLY') */
def  persist() : this.type = persist(StorageLevel.MEMORY)

/** Persist this RDD  with  the   default   storage level ('MEMORY_ONLY') */
def  cache() : this.type = persist()

 

可以看出cache是一种特殊的persist方法。默认将RDD结果缓存到内存。

persist可以指定存储级别,是否序列化,以及存储副本数等参数

Spark 持久化机制

注意:

1.最常使用的是MEMORY_ONLY 和MEMORY_AND_DISK。

2.一个RDD多次触发Action才有意义

3.如果将数据缓存到内存,内存不够,以分区为单位,只缓存部分分区的数据

4.cache和persist()都是懒加载,只有当action触发时才会执行

5.cache和persist算子严格来说不是Transformation算子,因为没有生成新的RDD,只是标记当前RDD被cache或persist

 

 

2.检查点checkpoint

checkpoint机制是spark中另外一种持久化机制。适用于非常复杂的计算,例如机器学习和迭代计算,为了避免因为丢失数据而重复计算,可以将宝贵的中间结果保存到HDFS中,确保中间结果安全。

sc.setCheckPointDir()

在调用RDD的checkpoint方法之前,要先指定checkpoint的目录,一般为HDFS存储路径,这个目录可以是事先不存在的。第一次触发Action的时候,才会触发Checkpoint,这时会额外触发一个job,这个job的目的就是为了将中间结果存储到HDFS中。

注意:

1.如果RDD做了checkpoint,那这个RDD之前的依赖关系就不在使用了。

2.多次触发Action,checkpoint才有意义。

3.checkpoint严格的说和cache,persist一样,不是transformation算子,只是标记当前rdd要做transformation。

4.如果checkpoint之前,对rdd进行了cache,可以避免重复的计算,如果有cache的数据,优先使用cache的数据,没有再使用checkpoint。

 

上一篇:flink 检查点(checkpoint)配置与使用


下一篇:断点续训