今天学习spark的一些行动算子和序列化
(1)行动算子
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
//saveAsSequenceFile要求数据类型为key-value类型
rdd.saveAsSequenceFile("output2")
sc.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
print(stringToLong)
sc.stop()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
//driver端中内存循环打印
rdd.collect().foreach(println)
println("0000000000")
//executol端中内存循环打印
rdd.foreach(println)
sc.stop()
}
(2)RDD 序列化(使用算子外的数据要进行序列化,不然就意味着无法传值给 Executor端执行)
在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
val user=new User()
rdd.foreach(
num=>{
println("age="+(user.age+num))
}
)
sc.stop()
}
//序列化对象,如果不序列化,无法传对象到executor端进行处理会报错
class User extends Serializable{
var age:Int=30
}
//样例类编译时会自动序列化,跟上面一样
case class User1(){
var age:Int=30
}