忙忙碌碌一天,啥都没干
小谈:
今天啥也没干,不知不觉已经是大年初五了,再过五六七八天就要开始考科目四了,希望早早拿到驾照
combineByKey
先解释一个每个参数的意义
- createCombiner:分组内的创建组合的函数,通俗点就是将都进来的数据进行初始化,把当前的值作为参数,可以进行一些转换操作.分区内每种key调用一次
- mergeValue:分区内的合并函数,作用在每一个分区,也就是将分组后的Value合并到之前的转换后的C之后。将createCombiner结果与相同的key对应的值最累加
- mergeCombiners:该函数主要将多分区合并,各个分区相同的Key对应的结果做聚合
combineByKey()会遍历分区中的所有元素,因此每个元素要么之前遇到过,要么没有遇到过,如果是一个新的元素(指的是这个key),那么就会使用createCombiner来创建那个键对应的累加器的初始值,只会每个键第一次出现的时候发生,并不是出现一个就发生一个
如果是之前已经出现的key,那么就会使用mergeValue来将该键对应的累加器的当前值与这个新的值进行合并
只讲参数意义不给代码实战就是耍流氓。
下面通过一个案例来实战一个这个函数
有一个源源梓同学,求她的成绩平均数。三门科目分数97,96,95.求她的平均成绩,三门科目,总分288分。平均分数为96分。
来看代码
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value1 = sparkContext.makeRDD(
List(("源源梓", 97), ("源源梓", 96), ("源源梓", 95)),2)
val value = value1.combineByKey(
//将成绩转换成 成绩 ==》
(成绩,1) score => (score, 1),
//分区内的规则 (成绩,1) =》 (成绩+成绩,1+1)
(tuple1: (Int, Int), v) => (tuple1._1 + v, tuple1._2 + 1),
//分区间的规则 成绩和成绩相加,次数和次数相加
(tuple2: (Int, Int), tuple3: (Int, Int)) =>
(tuple2._1 + tuple3._1, tuple2._2 + tuple3._2) )
val value2 = value.map {
case (name, (score, num)) => (name, score / num) }
value2.collect().foreach(println(_))
可能光看代码看不懂,下面来一一讲解一下。
1..首先创建RDD
val value1 = sparkContext.makeRDD(List(("源源梓", 97), ("源源梓", 96), ("源源梓", 95)),2)
2.调用combineByKey算子
val value = value1.combineByKey(
//createCombiner,通过key分组后,97,96,95
//平均分数=总分数 / 科目门数
//先对每门科目进行转换
score => (score, 1)
//分区内的规则 (成绩,1) =》 (成绩+成绩,1+1)
//(97,1) 分区2:(96 + 95,2)
(tuple1: (Int, Int), v) => (tuple1._1 + v, tuple1._2 + 1),
//分区间的规则 成绩和成绩相加,次数和次数相加
(tuple2: (Int, Int), tuple3: (Int, Int)) =>
(tuple2._1 + tuple3._1, tuple2._2 + tuple3._2) )
3.计算平均数
//value的返回值类型
[String,[Int,Int]]
//String 就是 源源梓 第一个Int:总分 第二个Int:科目数
val value2 = value.map {
case (name, (score, num)) => (name, score / num) }
看一下上面案例的类似图解
上面这个图解可以更明了的来看出来这个算子的作用。
刚开始分组后,对分组的数据进行转换
在分区内对相同组内的数据进行相加,总数相加的同时个数也相加
分区间对相同key的数据进行相加,(这里的数据就是在分区内计算出来的数据)
sortByKey
在一个(k,v)的RDD上调用,K必须实现Ordered特质,返回一个按照key进行排序
先看一下这个算子
def sortByKey(ascending: Boolean = true,
numPartitions: Int = self.partitions.length) : RDD[(K, V)]
= self.withScope { val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
在使用这个算子的时候一定会发生shuffle,因为底层继承了new ShuffledRDD。当然拉,可以根据第二个参数来选择是升序还是降序排列。
通过代码来认识一下
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value1 = sparkContext.makeRDD(List((a, 97), (b, 96), (c, 95)),2) value1.sortByKey().collect().foreach(println(_))
默认是升序的,那么结果就是
(a,97) (b,96) (c,95)
MapValues
有时候,我们只想访问Pair RDD的值部分,这时操作二元组很麻烦,这个时候使用Spark提供的mapValues(func)函数。功能类似于map{case (x,y) : (x,func(y))}
总结:
今天摆烂了,感觉写的好少好少。明天一定好好学习,好好输出文章,积攒这么多的文笔,得要好好发挥发挥。
明天将会输出 TopN案例和动作算子。如果可以还会讲RangePartitioner。
还差四五百阅读量就可以破两万了,虽然这篇博客挺差劲的,希望明天醒来可以看到阅读量破两万。
如果破两万,一定好好写博客。摆烂太难受了