3.7 缓存
除了将数据驻留在内存中以外,缓存在RDD中也扮演了另外一个重要的角色。就像之前所说的,创建RDD有两种方式,从存储系统中读取数据或者应用其他现存RDD的转换操作。默认情况下,当一个RDD的操作方法被调用时,Spark会根据它的父RDD来创建这个RDD,这有可能导致父RDD的创建。如此往复,这个过程一直持续到Spark找到根RDD,而后Spark通过从过存储系统读取数据的方式创建根RDD。操作方法被调用一次,上面说的过程就会执行一遍。每次调用操作方法,Spark都会遍历这个调用者RDD的血统树,执行所有的转换操作来创建它。
考虑下面的例子。
尽管上面的代码只调用了一次textFile方法,但是日志文件会被从硬盘中读取两次。这是因为调用了两次操作方法count。在调用errorLogs.count时,日志文件第一次被读取,调用warningLogs.count时,日志文件被再次读取。这只是个简单的例子,现实世界中的应用会有更多的各种转换和操作。
如果一个RDD缓存了,Spark会执行到目前为止的所有转换操作并为这个RDD创建一个检查点。具体来说,这只会在第一次在一个缓存的RDD上调用某操作的时候发生。类似于转换方法,缓存方法也是惰性的。
如果一个应用缓存了RDD,Spark并不是立即执行计算并把它存储在内存中。Spark只有在第一次在缓存的RDD上调用某操作的时候才会将RDD物化在内存中。而且这第一次操作并不会从中受益,后续的操作才会从缓存中受益。因为它们不需要再执行从存储系统中读取数据开始的一系列操作。它们通常都运行得快多了。还有,那些只使用一次数据的应用使用缓存也不会有任何好处。只有那些需要对同样数据做多次迭代的应用才能从缓存中受益。
如果一个应用把RDD缓存在内存中,Spark实际上是把它存储在每个worker节点上执行者的内存中了。每个执行者把它所计算的RDD分区缓存在内存中。
3.7.1 RDD的缓存方法
RDD类提供了两种缓存方法:cache和persist。
cache
cache方法把RDD存储在集群中执行者的内存中。它实际上是将RDD物化在内存中。
下面的例子展示了怎么利用缓存优化上面的例子。
persist
persist是一个通用版的cache方法。它把RDD存储在内存中或者硬盘上或者二者皆有。它的输入参数是存储等级,这是一个可选参数。如果调用persist方法而没有提供参数,那么它的行为类似于cache方法。
persist方法支持下列常见的存储选项。
MEMORY_ONLY:当一个应用把 MEMORY_ONLY作为参数调用persist方法时,Spark会将RDD分区采用反序列化Java对象的方式存储在worker节点的内存中。如果一个RDD分区无法完全载入worker节点的内存中,那么它将在需要时才计算。
DISK_ONLY:如果把DISK_ONLY作为参数调用persist方法,Spark会物化RDD分区,把它们存储在每一个worker节点的本地文件系统中。这个参数可以用于缓存中间的RDD,这样接下来的一系列操作就没必要从根RDD开始计算了。
MEMORY_AND_DISK:这种情况下,Spark会尽可能地把RDD分区存储在内存中,如果有剩余,就把剩余的分区存储在硬盘上。
MEMORY_ONLY_SER:这种情况下,Spark会采用序列化Java对象的方式将RDD分区存储在内存中。一个序列化的Java对象会消耗更少的内存,但是读取是CPU密集型的操作。这个参数是在内存消耗和CPU使用之间做的一个妥协。
MEMORY_AND_DISK_SER:Spark会尽可能地以序列化Java对象的方式将RDD分区存储在内存中。如果有剩余,则剩余的分区会存储在硬盘上。
3.7.2 RDD缓存是可容错的
在分布式环境中可容错性是相当重要的。之前我们就已经知道了当节点出故障的时候Spark是怎么自动把计算作业转移到其他节点的。Spark的RDD机制同样也是可容错的。
即使一个缓存RDD的节点出故障了,Spark应用也不会崩溃。Spark会在另外节点上自动重新创建、缓存出故障的节点中存储的分区。Spark利用RDD的血统信息来重新计算丢失的缓存分区。
3.7.3 缓存内存管理
Spark采用LRU算法来自动管理缓存占用的内存。只有在必要时,Spark才会从缓存占用的内存中移除老的RDD分区。而且,RDD还提供了名为unpersist的方法。应用可以调用这个方法来从缓存占用的内存中手动移除RDD分区。