RDD缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
当一个RDD多次被使用,或者是RDD的计算任务链比较长,计算的结果相对较珍贵的场景
下图中标注的是两个Job中的RDD是不能同时使用一份数据
使用缓存以后数据只加载一次 , 将多次使用的RDD数据缓存起来
persist和cache()
def main(args: Array[String]): Unit = {
val sc: SparkContext = SparkUtil.getSc
val rdd1: RDD[String] = sc.textFile("d://word.txt")
val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))
val rdd3: RDD[String] = rdd2.map(e => {
println("---------------------")
e
})
val rdd4: RDD[(String, Int)] = rdd3.map(word => {
(word, 1)
})
// 仅仅为了提升多次使用处理数据的效率可以将数据缓存在内存中
rdd4.persist(StorageLevel.MEMORY_ONLY)
// def cache(): this.type = persist()
// 这个函数也可以将数据缓存起来 ,函数的本质也是persist,用户只能将数据缓存在默认的内存中
// rdd4.cache()
// 获取RDD4 以后 在后续的处理数据的时候这个RDD使用了两次 , 对象的重复使用
// 但是我们要明确的是 RDD中不会存储数据 ,也就是数据没有重复使用 ,仅仅是使用的地址值
// 在控制台上打印的结果我们可以看出来 使用两次RDD其实是加载执行了两次数据
// 显然这样的效率不高
val res1: RDD[(String, Int)] = rdd4.reduceByKey(_ + _)
res1.foreach(println)
val res2: RDD[(String, Iterable[Int])] = rdd4.groupByKey()
res2.foreach(println)
sc.stop()
}
上面的两个函数将数据缓存在指定的临时文件上 ,当程序运行完毕以后会将数据删除