第15课:RDD创建内幕
- RDD的创建方式
Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法
Spark中的基本方式:
1) 使用程序中的集合创建
这种方式的实际意义主要用于测试。
2) 使用本地文件系统创建
这种方式的实际意义主要用于测试大量数据的文件
3) 使用HDFS创建RDD
这种方式为生产环境中最常用的创建RDD的方式
4) 基于DB创建
5) 基于NoSQL:例如HBase
6) 基于S3(SC3)创建
7) 基于数据流创建
- RDD创建实战
1) 通过集合创建
代码:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set
app name
conf.setMaster("local")//run
local
val sc =new SparkContext(conf)
val numbers = 1 to 100 //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_)
//1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
}
}
结果:
2)
通过本地文件系统创建
代码:
object RDDBasedOnLocalFile {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")
val linesLength=rdd.map(line=>line.length())
val sum = linesLength.reduce(_+_)
println("the total characters of the file"+"="+sum)
}
}
结果:
3) 通过HDFS创建RDD
代码:
val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
结果:
关于spark并行度:
1.默认并行度为程序分配到的cpu core的数目
2.可以手动设置并行度,并行度最佳实践
1. 2-4 partitions for each CPU core
2.综合考虑cpu和 内存
注:本内容原型来自 IMP 课程笔记