目录
为了引入这两个机制,这里我们手写一个实现Pi的操作,这里我们只需要求出落在圆里的点与落在正方形里面的点的概率比值即可
这里我们着重突出
slices表示生成多少个任务
cnt表示每一个任务内生成多少个点
这里的任务数,我们每一次计算都会创建一个任务task,这就导致需要处理的文件数量非常多,这里我们就可以适当的减少slices的数量,增加cnt的数量,来提高计算效率;虽然要计算的点的数量是一样的,但是效率是完全不一样的
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo6Pi")
val sc = new SparkContext(conf)
//这是0-1之间的随机数
// println(Random.nextDouble())
//slices表示生成多少个任务,cnt表示每个任务内生成多少个点
val slices = 100
val cnt = 10000
//parallelize 支持传入一个参数numSlices,默认是同并行度一致,可以手动指定,表示最后生成的RDD分区数是多少
//最终会决定task的数量
val seqRDD: RDD[Int] = sc.parallelize(0 to cnt*slices,slices)
//这里随机生成N个[-1,1]之间的点
val pointRDD: RDD[(Double, Double)] = seqRDD.map(seq => {
val x: Double = Random.nextDouble() * 2 - 1
val y: Double = Random.nextDouble() * 2 - 1
(x, y)
})
//从这些点中过滤出圆内的点
val circlePointNum: Long = pointRDD.filter(
(kv) => {
val x: Double = kv._1
val y: Double = kv._2
val res: Double = x * x + y * y
res <= 1
}
).count()
println(circlePointNum)
val pi: Double = circlePointNum.toDouble/ (cnt * slices) * 4
println(pi)
}
cache->提升效率
我们的缓存是缓存到Executor中,这里面有CPU,有内存,程序也是在Executor中运行的
为什么要使用cache
一直在我们没有加上cache缓存之前,我们所跑的spark任务和mapreduce没有什么区别,都是map和reduce,虽然在spark中归成了stage,但原理都是map和reduce,中间夹杂着shuffle
这里我们计算每个班的学生人数和总体的性别人数
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Demo16Cache")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
//统计班级人数
stuRDD.map(line=>{
val strings: Array[String] = line.split(",")
(strings(4),1)
}).reduceByKey(_+_)
.foreach(println)
//统计性别人数
stuRDD.map(line=>{
val strings: Array[String] = line.split(",")
(strings(3),1)
}).reduceByKey(_+_)
.foreach(println)
我们可以发现这里面stuRDD被我们重复调用了
怎么能看出来被重复调用了呢,我们做一些小调整,对读取数据的stuRDD,我们使用一个map方法,在每次使用它的时候,我们都加上一个print
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Demo16Cache")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
val stuRDDMap: RDD[String] = stuRDD.map(stu => {
println("stuRDD")
stu
})
// stuRDDMap.cache()
//统计班级人数
stuRDDMap.map(line=>{
val strings: Array[String] = line.split(",")
(strings(4),1)
}).reduceByKey(_+_)
.foreach(println)
//统计性别人数
stuRDDMap.map(line=>{
val strings: Array[String] = line.split(",")
(strings(3),1)
}).reduceByKey(_+_)
.foreach(println)
结果我们可以发现,在两次打印数据的时候(一个学生数据,一个性别数据),都打印了”stuRDD“出来,
所以这里我们可以发现,每一次读取数据使用的时候,都调用了两次stuRDD
每一次做计算的时候都需要重复从HDFS取数据,再加载成stuRDD,然后再拉去做计算,这样做不符合spark的计算特性
可见这篇博客
spark之所以计算速度快,就是在做计算的时候,不需要重复取数据来计算,所以这里,我们可以将数据做一个缓存,每次取这部分数使用的时候,就不需要重复从HDFS进行选取了
一份数据被取多次的时候,不需要重复读取,(重复读取的话和mapreduce没有什么区别)
cache缓存
被使用多次的RDD我们可以进行缓存
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Demo16Cache")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
val stuRDDMap: RDD[String] = stuRDD.map(stu => {
println("stuRDD")
stu
})
stuRDDMap.cache()
//统计班级人数
stuRDDMap.map(line=>{
val strings: Array[String] = line.split(",")
(strings(4),1)
}).reduceByKey(_+_)
.foreach(println)
//统计性别人数
stuRDDMap.map(line=>{
val strings: Array[String] = line.split(",")
(strings(3),1)
}).reduceByKey(_+_)
.foreach(println)
这里我们运行之后发现,在打印一份班级结果和一份性别人数的同时,只打印了一次“stuRDD”
缓存的级别
我们可以看到,cache默认调用的也是persist方法
而再点进去persist,我们可以看到这里默认给了一个MEMORY_ONLY
而再点进去StorageLevel,我们可以看到这里面默认给了许多缓存机制
cache和persist的区别
我们可以看到,cache和persist默认都可以做缓存,只是cache默认只能使用MEMORY_ONLY,也就是基于内存缓存的策略,而persist里面可以给出许多不同的缓存策略
缓存策略的选择
这里面
DISK_ONLY指的是基于磁盘存储缓存
MEMORY_ONLY指的是基于内存存储缓存
而后面加上SER的含义是需不需要压缩
加上2的含义是需不需要备份
1.数据不大,内存足的时候
选择MEMORY_ONLY
2.数据量有点大,内存不能完全放下的时候
选择MEMORY_AND_DISK_SER
尽可能将数据缓存到内存中,这样效率是最高的
unpersist释放缓存
我们spark中还有sparkstreaming,数据是源源不断的读取进来的,如果我们不断的缓存,迟早会造成内存溢出,这时候我们就需要释放缓存
在某个用不到缓存的地方,就释放缓存
checkpoint->容错
持久化,HDFS不挂,它就不挂
在上面,我们已经得知了,cache缓存的数据是在Executor中,但是Executor有可能会挂,一旦挂了,我们的数据就没有了,需要重新计算;
这时候我们可以将缓存的数据写入HDFS,但这样的效率会降低,这种方式我们可以用来容错
这里我们只需要做这两步即可
不过我们这里的缓存,这里输出了两次“stuRDD”,这就和我们checkpoint的工作机制有关
checkpoint的原理
1.先从前往后计算一遍,遇到哪个需要被checkpoint,就做一个标记
2.重新启动一个新的job,重新计算一遍,并将被标记的rdd写入HDFS
而我们的cache是在遇到需要被缓存的rdd之后,就直接将其写进内存
优化
最好在调用checkpoint之前我们就使用一次cache,这样的话,重新启动的job只需要将内存中的数据拷贝到HDFS即可,这里就可以省去计算的过程
在checkpoint的job执行完成之后,会将这个RDD的依赖关系统一切断
所以cache主要用于提升效率,checkpoint主要用于容错
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。