spark封神之路(11)-RDD缓存

RDD缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

当一个RDD多次被使用,或者是RDD的计算任务链比较长,计算的结果相对较珍贵的场景

下图中标注的是两个Job中的RDD是不能同时使用一份数据

spark封神之路(11)-RDD缓存

使用缓存以后数据只加载一次 , 将多次使用的RDD数据缓存起来

spark封神之路(11)-RDD缓存

persist和cache()

 def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.getSc
​
    val rdd1: RDD[String] = sc.textFile("d://word.txt")
    val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))
    val rdd3: RDD[String] = rdd2.map(e => {
      println("---------------------")
      e
    })
    val rdd4: RDD[(String, Int)] = rdd3.map(word => {
      (word, 1)
    })
    // 仅仅为了提升多次使用处理数据的效率可以将数据缓存在内存中
    rdd4.persist(StorageLevel.MEMORY_ONLY)
    //  def cache(): this.type = persist()
    // 这个函数也可以将数据缓存起来 ,函数的本质也是persist,用户只能将数据缓存在默认的内存中
    // rdd4.cache()
    // 获取RDD4 以后 在后续的处理数据的时候这个RDD使用了两次 , 对象的重复使用
    // 但是我们要明确的是  RDD中不会存储数据 ,也就是数据没有重复使用 ,仅仅是使用的地址值
    // 在控制台上打印的结果我们可以看出来 使用两次RDD其实是加载执行了两次数据
    // 显然这样的效率不高
    val res1: RDD[(String, Int)] = rdd4.reduceByKey(_ + _)
    res1.foreach(println)
    val res2: RDD[(String, Iterable[Int])] = rdd4.groupByKey()
    res2.foreach(println)
    sc.stop()
  }

上面的两个函数将数据缓存在指定的临时文件上 ,当程序运行完毕以后会将数据删除

上一篇:面试系列五 之 项目涉及技术Spark


下一篇:网站用户行为分析项目之会话切割(四)=> 代码重构