spark(16)RDD的缓存机制、checkpoint机制

RDD的缓存机制(★★★★★)

什么是rdd的缓存

spark可以把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,可以直接从缓存中获取得到,避免了重复计算。缓存是加快后续对该数据的访问操作。

如何对rdd设置缓存

可以通过persist方法或cache方法将前面的RDD的数据缓存。但这两个方法被调用时不会立即执行缓存操作,而是触发后面的action时,才将RDD缓存在计算节点的内存中,并供后面重用。

persist方法和cache方法的源代码如下,可以看到cache方法内调用了persist方法,persist方法的参数的默认值是StorageLevel.MEMORY_ONLY。

/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

StorageLevel的部分源码带,StorageLevel是一个object,里面定义了不同的变量来表示不同的存储级别。

  1. NONE 不进行缓存
  2. DISK_ONLY 缓存到磁盘 DISK_ONLY_2 缓存到磁盘,2份
  3. MEMORY_ONLY 缓存到内存 MEMORY_ONLY_2 缓存到内存2份
  4. MEMORY_ONLY_SER 序列化后缓存到内存 MEMORY_ONLY_SER_2 序列化后缓存到内存2份
  5. MEMORY_AND_DISK 缓存到内存或磁盘 MEMORY_AND_DISK_2 缓存到内存或磁盘2份
  6. MEMORY_AND_DISK_SER 缓存到内存或磁盘且序列化 MEMORY_AND_DISK_SER_2 ...
  7. OFF_HEAP 缓存到堆外

注意:MEMORY_AND_DISK并不是把数据缓存一部分在内存中一部分在磁盘中,而是优先考虑内存,内存不够了才缓存到磁盘。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

cache和persist的使用示例

打开spark shell

spark-shell --master spark://node01:7077 --executor-memory 1g --total-executor-cores 2

登录8080端口的spark页面,找到spark shell对应的Application,点击Spark shell

spark(16)RDD的缓存机制、checkpoint机制

点击后,就进入了http://node01:4040/jobs/,然后切换到Storage

spark(16)RDD的缓存机制、checkpoint机制

往spark shell一行行执行下列代码,注意刷新观察Storage界面的变化。

val rdd1=sc.textFile("/words.txt")
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.cache
rdd3.collect

执行完rdd3.collect后,页面才发生了变化,如下图,图中显示存储在内存中的大小为440.0B,磁盘为0:

spark(16)RDD的缓存机制、checkpoint机制

继续执行下列代码:

val rdd4=rdd3.map((_,1))
val rdd5=rdd4.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
rdd5.collect

执行rdd5.collect后,页面再次发生变化,如下图:

spark(16)RDD的缓存机制、checkpoint机制

cache和persist的区别(面试题

简述下如何对RDD设置缓存,以及它们的区别是什么?

对RDD设置缓存成可以调用rdd的2个方法: 一个是cache,一个是persist,调用这2个方法都可以对rdd的数据设置缓存,但不是立即就触发缓存执行,后面需要有action,才会触发缓存的执行。

cache方法和persist方法区别:

  1. cache: 默认是把数据缓存在内存中,其本质就是调用persist方法;
  2. persist:可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别都被定义在StorageLevel这个object中。

什么时候需要设置缓存?

首先理解一个概念:transformation算子是延迟加载的,只有在触发action时才会被执行,job执行完之后,前面所有rdd的数据就都不存在了,如果没有action算子,各个rdd之间就只是一个转换

1、某个rdd的数据后期被使用了多次

spark(16)RDD的缓存机制、checkpoint机制

如上图所示的计算逻辑:

当第一次使用rdd2做相应的算子操作得到rdd3的时候,就会从rdd1开始计算,先读取HDFS上的文件,然后对rdd1 做对应的算子操作得到rdd2,再由rdd2计算之后得到rdd3。同样为了计算得到rdd4,前面的逻辑会被重新计算。

默认情况下多次对同一个rdd执行算子操作, rdd都会对这个rdd及之前的父rdd全部重新计算一次。 这种情况在实际开发代码的时候会经常遇到,但是我们一定要避免一个rdd重复计算多次,否则会导致性能急剧降低。

因此,可以把多次使用到的rdd,也就是公共rdd进行持久化,避免后续需要,再次重新计算,提升效率。如下图,在设置了rdd2.cache或rdd2.persist后,得到rrd3时(假设rdd2-->rdd3是一个action),步骤还是HDFS-->rdd1-->rdd2-->rdd3,但是因为rdd3是rdd2经过action算子操作得到的,rrd2的数据得到缓存

那么生成rdd4的时候,步骤就简单了很多,直接从缓存中获取数据,计算得到rdd4。

spark(16)RDD的缓存机制、checkpoint机制

2、为了获取得到一个rdd的结果数据,经过了大量的算子操作或者是计算逻辑比较复杂,总之某个rdd的数据来之不易时,可以进行缓存:

val rdd2=rdd1.flatMap(函数).map(函数).reduceByKey(函数).xxx.xxx.xxx.xxx.xxx

清除缓存数据

自动清除

一个application应用程序结束之后,对应的缓存数据也就自动清除

手动清除

调用rdd的unpersist方法

RDD的checkpoint机制(★★★★★)

checkpoint概念

我们可以对rdd的数据进行缓存,保存在内存或者是磁盘中。后续就可以直接从内存或者磁盘中获取得到,但是它们不是特别安全。

cache

它是直接把数据保存在内存中,后续操作起来速度比较快,直接从内存中获取得到。但这种方式很不安全,由于服务器挂掉或者是进程终止,会导致数据的丢失

persist

它可以把数据保存在本地磁盘中,后续可以从磁盘中获取得到该数据,但它也不是特别安全,由于系统管理员一些误操作删除了,或者是磁盘损坏,也有可能导致数据的丢失

checkpoint(检查点)

它是提供了一种相对而言更加可靠的数据持久化方式。它是把数据保存在分布式文件系统,比如HDFS上。这里就是利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。

如何设置checkpoint

1、在hdfs上设置一个checkpoint目录

sc.setCheckpointDir("hdfs://node01:8020/checkpoint") 

2、对需要做checkpoint操作的rdd调用checkpoint方法

val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
val rdd2=rdd1.flatMap(_.split(" ")) 

3、最后需要有一个action操作去触发任务的运行

rdd2.collect

查看缓存中hdfs中的数据:

[hadoop@node01 ~]$ hdfs dfs -ls /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5
Found 2 items
-rw-r--r--   3 hadoop supergroup         53 2020-04-17 10:20 /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5/part-00000
-rw-r--r--   3 hadoop supergroup          4 2020-04-17 10:20 /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5/part-00001

cache、persist、checkpoint三者区别

cache和persist

  • cache默认数据缓存在内存中
  • persist可以把数据保存在内存或者磁盘中
  • 后续要触发 cache 和 persist 持久化操作,需要有一个action操作
  • 它不会开启其他新的任务,一个action操作就对应一个job
  • 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失

checkpoint

  • 可以把数据持久化写入到hdfs上

  • 后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作

  • 它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。

  • 程序运行完成后对应的checkpoint数据就不会消失

cache或persisit与checkpoint的结合使用:

   sc.setCheckpointDir("/checkpoint")
   val rdd1=sc.textFile("/words.txt")
   val rdd2=rdd1.cache
   rdd2.checkpoint
   val rdd3=rdd2.flatMap(_.split(" "))
   rdd3.collect
   
//对checkpoint在使用的时候进行优化,在调用checkpoint操作之前,可以先来做一个cache操作,缓存对应rdd的结果数据,后续就可以直接从cache中获取到rdd的数据写入到指定checkpoint目录中   
上一篇:Checkpoint防火墙


下一篇:MySQL数据库意外掉线,数据这么恢复吗