目录
- 一、RDD 持久化介绍
- 二、RDD 持久化级别
- 三、持久化级别选择
- 四、删除持久化数据
- 五、RDD cache 和 persist
- 六、RDD checkpoint
- 七、DataSet cache 和 persist
一、RDD 持久化
因为 Spark 程序执行的特性,即延迟执行和基于 Lineage 最大化的 pipeline,当 Spark 中由于对某个 RDD 的 Action 操作触发了作业时,会基于 Lineage 从后往前推,找到该 RDD 的源头 RDD,然后从前往后计算出结果。
很明显,如果对某个 RDD 执行了多次 Transformation 和 Action 操作,每次 Action 操作出发了作业时都会重新从源头 RDD 出计算一遍来获得 RDD,再对这个 RDD 执行相应的操作。当 RDD 本身计算特别复杂和耗时时,这种方式性能是非常差的,此时必须考虑对计算结果的数据进行持久化。
数据持久化(或称为缓存)就是将计算出来的 RDD 根据配置的持久化级别,保存在内存或磁盘中,以后每次对该 RDD 进行算子操作时,都会直接从内存或者磁盘中提取持久化的 RDD 数据,然后执行算子操作,而不会从源头处重新计算一遍该 RDD。
二、RDD 持久化级别
Spark 的持久化级别有如下几种:
持久化级别 | 含义 |
---|---|
MEMORY_ONLY | 将 RDD 数据保存在内存中,如果内存不够存放则数据有可能有部分不会进行持久化,在瑕疵对该 RDD 执行算子操作时,没有被持久化的数据需要从源头重新计算一遍。这是默认的持久化策略,使用 cache() 方法使用的就是这种持久化策略。 |
MEMORY_AND_DISK | 优先尝试将数据保存在内存中,如果内存不够存放所有数据,会将数据写入到磁盘文件中,下次对该 RDD 执行算子操作时,持久化在内存和磁盘文件中的数据会被读取出来使用。 |
MEMORY_ONLY_SER | 基本含义同 MEMORY_ONLY。唯一的区别是会将 RDD 数据进行序列化,将 RDD 的每个 partition 序列化为一个字节数据,这种方式更加节省内存,避免持久化的数据占用过多内存导致频繁 GC。 |
MEMORY_AND_DISK_SER | 基本含义同 MEMORY_AND_DISK。唯一的区别是会将 RDD 数据进行序列化, RDD 的每个 partition 会被序列化成一个字节数据,种方式更加节省内存,避免持久化的数据占用过多内存导致频繁 GC。 |
DISK_ONLY | 使用未序列化的 Java 对象格式,将数据全部写入磁盘文件中。 |
OFF_HEAP | 将 RDD 以序列化的方式存在在 Alluxio 中(曾用名 Tachyon),而不是 Executor 的内存中,减少垃圾回收的压力,当 RDD 存储在 Alluxio 上时,Executors 的崩溃不会造成 RDD 数据的丢失。 |
MEMORY_ONLY_2, MEMORY_ONLY_SER_2, MEMORY_AND_DISK_2, MEMORY_AND_DIST_SER_2, DISK_ONLY_2, 等等 |
后缀有_2的持久化策略,表示将每个持久化的数据都复制一个副本,并保存在其他节点上,这种基于副本的持久化机制主要用于容错,如果单个节点持久化的数据丢失了,还可以从其他节点的副本上获取该 RDD 数据。 |
阅读 Spark 2.1.0 源码,RDD 持久化级别在 StorageLevel 类中有详细介绍,我们来详细看看。
//位置:/org/apache/spark/storage/StorageLevel.scala
/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
这里列出了 12 种 RDD 缓存级别,每个缓存级别后面都 new 了一个 StorageLevel类的构造函数,什么意思呢?我么可以看看其构造函数。
// 位置:org/apache/spark/storage/StorageLevel.scala
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
private def this(flags: Int, replication: Int) {
this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
def this() = this(false, true, false, false) // For deserialization
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
StorageLevel类的主构造器包含了5个参数:
- useDisk:使用硬盘(外存)。
- useMemory:使用内存。
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。
- replication:备份数(在多个节点上备份)。
理解了这 5 个参数,就不难理解不同缓存级别的含义了,比如 val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
缓存,表示将 RDD 的数据持久化在硬盘以及内存中,对数据进行序列化存储,并且将每个持久化的数据都复制一份副本保存到其他节点。
三、持久化级别选择
Spark 提供了这么多中持久化策略,那在实际场景中应该如何使用呢?
通常遵循的准则是,优先考虑内存,内存放不下就考虑序列化后放到内存中,尽量不要存储到磁盘中,因为一般 RDD 的重新计算要比从磁盘中读取更快,只有在需要更快的恢复时才使用备份级别(所有的存储级别都可以通过重新计算来提供全面的容错性,但是备份级别允许用于在 RDD 的备份上执行任务,而无须重新计算丢失的分区)。具体的选取方式如下:
- 采用默认情况下性能最高的 MEMORY_ONLY。该持续化级别下,对 RDD 的后续算子操作,都是基于纯内存中的数据操作,不需要从磁盘文件中读取数据,性能很高,也不需要进行序列化与反序列化操作,避免了这部分开销。但要注意的是,如果 RDD 的数据比较多(比如几十亿条),直接用这种持久化级别可能会导致 JVM 的 OOM 内存溢出异常。
- 如果使用 MEMROY_ONLY 级别发生了内存溢出,建议尝试使用 MEMROY_ONLY_SER 级别。该级别会将 RDD 数据序列化后再保存到内存中,此时每个 partition 仅仅是一个字节数组,减少了对象数量,并降低内存占用,该级别比 MEMORY_ONLY 多出来的性能开销,主要就是序列化和反序列化的开销。同样,如果 RDD 数据过多仍然会导致 OOM 内存溢出异常。
- 如果纯内存级别都无法使用,则建议使用 MEMROY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。该策略会优先将数据缓存到内存中,内存缓存不下时才会写入磁盘。
- 通常不建议使用 DISK_ONLY 级别。因为完全基于磁盘文件进行数据的读写,会导致性能急剧下降,有时还不如重新计算一次该 RDD。
- 通常不建议使用后缀为_2的备份级别。因为该级别必须将所有的数据都复制一份副本,并发送到其他节点上,而数据复制和网络传输会导致较大的性能开销。除非是作业的高可用性能要求很高,否则不建议使用。
- OFF_HEAP 级别一般使用得少,但优势也比较明显。该方式将 RDD 数据持久化到 Alluxio 而不是 Executor 内存中,可以避免 Executors 崩溃缓存数据丢失的情况。
四、删除持久化数据
Spark 的机制可以自动监控各个节点上的缓存使用率,并以 LRU (Least Recently Used,近期最少使用)算法删除过时的缓存数据。当然,如果想手动删除一个 RDD 数据的缓存,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist()
方法。
五、 RDD cache 和 persist
我们先来看一个 persist 的示例。
// cache 使用示例:
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist 使用示例
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
唯一可以看出的是 persist() 持久化是可以手动指定持久化类型的,而 cache() 无须指定。那它们之间到底有什么区别呢?
通过阅读 Spark 2.1.0 源码,可以看到 cache() 方法调用了无参的 persist() 方法。想知道两者的区别,还需要进一步看看 persist() 方法逻辑。
//位置:org/apache/spark/rdd/RDD.scala
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
可以看到 persist() 方法调用了 persist(StorageLevel.MEMORY_ONLY) 方法,即默认缓存方式采用 MEMORY_ONLY 级别。
//位置:org/apache/spark/rdd/RDD.scala
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
继续往下看,persist() 方法有一个 StorageLevel 类型的参数,该参数表示 RDD 的缓存级别。至此,也就能看出 cache 和 persist 的区别了:即 cache 只有一个默认的缓存级别 MEMORY_ONLY,而 persist 可以根据情况设置其他的缓存级别。
//位置:org/apache/spark/rdd/RDD.scala
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
六、RDD 的 checkpoint
把数据通过 cache 或 persist 持久化到内存或磁盘中,虽然是快速的但却不是最可靠的,checkpoint 机制的产生就是为了更加可靠地持久化数据以复用 RDD 计算数据,通常针对整个 RDD 计算链路中特别需要数据持久化的缓解,启用 checkpoint 机制来确保高容错和高可用性。
可以通过调用 SparkContext.setCheckpointDir() 方法来指定 checkpoint 是持久化的 RDD 数据的存放位置,这里可以存在本地或 HDFS 中(生产环境通常是放在 HDFS 上,借助 HDFS 本身的高容错和高可靠的特性完成数据的持久化),同时为了提高效率,可以指定多个目录。
需要说明的是,checkpoint 和 persist 一样是惰性执行的,在对某个 RDD 标记了需要 checkpoint 后,并不会立即执行,只有在后续有 Action 触发 Job 从而导致该 RDD 的计算,且在这个 Job 执行完成后,才会从后往前回溯找到标记了 checkpoint 的 RDD,然后重新启动一个 Job 来执行具体的 checkpoint 操作,所以一般都会对需要进行 checkpoint 的 RDD 先进行 persist 标记,从而把该 RDD 的计算结果持久化到内存或者磁盘上,以备 checkpoint 复用。
下面是 checkpoint 使用的一个示例:
// 配置 checkpointDir
sc.setCheckpointDir("hdfs://nameservice/spark/checkpoint")
val rdd1 = sc.textFile("hdfs://nameservice/data/README.md").cache()
// 对 rdd1 标记 checkpoint
rdd1.checkpoint()
// action 触发了 Job 才能导致 checkpoint 的真正执行
rdd1.count()
七、DataSet 的 cache 和 persist
阅读源码中无意中看到 DataSet 也支持 cache 和 persist 持久化方式,和 RDD 的持久化还是不太一样,我们来看看代码。
// 位置:org/apache/spark/sql/Dataset.scala
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
// DataSet 的 cache 持久化调用
def cache(): this.type = persist()
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
// 位置:org/apache/spark/sql/Dataset.scala
/**
* Persist this Dataset with the given storage level.
* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
* `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
* `MEMORY_AND_DISK_2`, etc.
*
* @group basic
* @since 1.6.0
*/
// DataSet 的 persist 持久化调用
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
this
}
// 位置:org/apache/spark/sql/execution/CacheManager.scala
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
// cache 和 persist 持久化调用的方法
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock
通过源码看到 cache() 调用的是无参的 persist() 方法,而 persist 调用 cacheQuery 方法,虽然 cache 和 persist 两者最终调用都是 cacheQuery 方法,但 cache 是采用默认的持久化级别 MEMORY_ADN_DISK
,而 persist 则是用户自定义,这里默认的持久化持久和 RDD 是不一样的。
参考连接
- https://blog.csdn.net/houmou/article/details/52491419
- https://dongkelun.com/2018/06/03/sparkCacheAndPersist/
- https://blog.csdn.net/qq_27639777/article/details/82319560