寒假学习进度8

今天继续学习spark双value算子

(1)

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)

//双value,数据源类型要保持一致,拉链类型可以不一致

//会报错,因为拉链分区数量要保持一致,并且分区中的数量要一致
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,6),2)
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 7, 8),4)

//拉链
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))

sc.stop()
}

(2)partitionBy

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(1, 2, 3, 4),2)

//将int类型转换成taper类型
val maprdd: RDD[(Int, Int)] = rdd.map((_, 1))

//partitionBy根据指定的分区规则对数据重新分区
maprdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")


sc.stop()
}

(3)reduceByKey

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

//reduceByKey,相同的key的数据进行value数据的聚合操作
//[1,2]
//[3,3]
//[6]
val reducerdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
x + y
})

reducerdd.collect().foreach(println)


sc.stop()
}

(4)groupByKey

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",1)))

//groupByKey,将数据源中的数据,相同的key分在一个组中,形成一个对偶元祖
val grouprdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()

grouprdd.collect().foreach(println)

sc.stop()
}

(5)aggregateByKey

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator15")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)

//aggregateByKey操作函数柯里化,2个参数列表
//第一个参数列表,需要一个参数,表示初始值
//主要用于第一个key时,和value进行分区计算
//第二个参数列表需要2个参数
//第一个参数表示分区内的计算规则
//第二个参数表示分区间计算规则
rdd.aggregateByKey(0)(
(x,y)=>math.max(x,y),
(x,y)=>x+y
).collect().foreach(println)

sc.stop()
}
上一篇:寒假学习进度


下一篇:寒假学习进度