今天是大年初三,猴赛雷
小谈
这几天每天晚上给她发一个红包,拜年红包而且还可以添加表情包。感觉现在过年好没有年味吖。嗑瓜子磕的嗓子都疼了。
Spark中的算子有很多,有Value类型,双Value类型,这两天写的都是Value类型的,昨天讲的是关于map的映射。
今天讲剩余的算子
glom
glom算子将RDD中的每一个分区变成一个数组,并放置在RDD中,数组中的元素类型与原分区中的类型相同。原本这个分区里面的数是分散的,glom之后,这个分区里面的元素会变成一个数组。
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T]
(this, (_, _, iter) => Iterator(iter.toArray))
可以看出来,glom底层用到了MapPartitionsRDD对象,这个MapPartitionsRDD对象底层重写了getPartitions方法,这个方法用到了RDD的依赖,保持分区的数据不变,然后将分区的数据转变成数组类型。
来一个例子,计算所有分区最大值的求和。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sparkContext = new SparkContext(sparkConf)
// 两个分区 12 在分区1 35 在分区2
val value = sparkContext.makeRDD(List(1, 2, 3, 5), 2)
//对每个分区的数组glom
val value1 = value.glom()
//找到每个数组的最大值
val value2 = value1.map(array => array.max)
//将每个数组最大值进行求和
val sum = value2.collect().sum
分区最大值求和
分区1的最大值: 2 分区2的最大值: 5
collect之后,每个节点给Driver返回每个数组的最大值,之后相加
groupBy
将数据按照指定的规则进行分组,分区默认不变,但是可能分组后的数据不再原来的分区,数据会被打乱,这样的操作就是Shuffle。极限情况下,数据可能分在同一个分区。
将相同的key的数据分到一个迭代器里面。
下面举一个例子
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
//将数据按照指定的规则进行分组
val value = sparkContext.makeRDD(List(1, 2, 3, 4), 2)
val value1 = value.groupBy(_ % 2 == 0)
value1.collect().foreach(println(_))
刚开始 两个分区
分区1: 1 2 分区2:3 4
group by之后,数据发生改变,只有一个分区存在数据
(1,(false,CompactBuffer(1, 3))) (1,(true,CompactBuffer(2, 4)))
可以看到,分区后的数据并不是上面图解的那样,因为发生了Shuffle
为了体现出group by的作用,下面将将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List("Hello", "hbase", "Hive", "Hadoop"), 2)
val value1 = value.groupBy(_.charAt(0))
value1.collect().foreach(println(_)) }
看一下结果
(h,CompactBuffer(hbase)) (H,CompactBuffer(Hello, Hive, Hadoop))
前面的字符是我们分组的依据,后面的迭代器就是分组后的元素
WordCount
学习mr的时候,第一个例子就是求WordCount。下面也用Spark来进行操作这就是简易的wc。首先将每个单词分开,分开之后对每个单词进行计数(单词,1),计完数后,根据单词分组,单词分组之后求迭代器的大小。
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List("Hello Spark", "Hello Scala"), 2)
//先把单词分成一个一个
val value1 = value.flatMap(_.split(" "))
//对每个单词进行计数
val value2 = value1.map((_, 1))
//分组
val value3 = value2.groupBy(_._1)
val value5 = value3.map {
case (word, iter) => (word, iter.size) }
value5.collect().foreach(println(_))
Filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。对数据进行筛选之后,分区不变,但是分区内的数据可能不均衡,可能会造成数据倾斜。
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val value1 = value.filter(_ != 1)
value1.collect().foreach(println(_))
Distinct
对数据集中的重复数据进行去重
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4,4))
val value1 = value.distinct()
如果在distinct中没有参数,那么最后的结果就是1,2,3,4
如果在distinct中有参数,比如2,那么最后的结果就会有两个分区,原本一个分区的(因为设置的setMaster为Local,所以默认一个分区),原本一个分区的数据,分散到两个分区里面,发生了shuffle
coalesce
根据数据量缩减分区。如果分区数过多,可以进行减少分区。
coalesce这个算子里面的参数有一个 shuffle ,如果为True,那么这个coalesce就会将数据shuffle,会将数据进行打乱,如果为False,那么数据就不会被打乱。
先来看看例子把
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
//四个数据,三个分区 1 2 34
val value = sparkContext.makeRDD(List(1, 2, 3, 4),3)
//减少分区为两个,shuffle默认为False,不会打乱数据
val value1 = value.coalesce(2)
//可以来看数据的分区
val value2 = value1.mapPartitionsWithIndex(
(index, iter) => { iter.map(num => (index, num)) })
value2.collect().foreach(println(_))
首先看一下先前的分区
分区1:1 分区2 :2 分区3:3 4
现在看一下在shuffle参数为False的情况下,分区的情况
可以看到,分区真的减少了,明明数据已经不再第三个分区了,跑到第二个分区了,这数据不就被打乱了。
其实并不是,看一看 分区3的数据 3和4 一起进入了第二个分区,第三分区的数据没有被打乱。这就是所谓的shuffle,并没有将数据打乱,只是将原来的数据放到了其它的分区。
现在看一下shuffle参数为True的情况下
如果有Shuffle了,1 2 3 4 分到两个分区里面,
分区1 : 1 3 分区2: 2 4
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
//四个数据,三个分区 1 2 34
val value = sparkContext.makeRDD(List(1, 2, 3, 4),3)
//减少分区为两个,shuffle参数为true,会有shuffle阶段
val value1 = value.coalesce(2,true)
//可以来看数据的分区
val value2 = value1.mapPartitionsWithIndex(
(index, iter) => { iter.map(num => (index, num)) })
value2.collect().foreach(println(_))
可以看到,原来的第一分区,第二分区里面的数据不改变,只是将第三个分区里面的数据放入到两个分区里面
repartition
repartition算子和coalesce算子基本相同,不过reaprtition算子可以将分区数目减少,也可以将分区数目增大,不论是增大或减小,repartition算子一定会有shuffle阶段,一定会打乱数据。
可以看到,shuffle参数默认就是true。
就是会改变数据的分区
sortBy
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理
的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
致。中间存在 shuffle 的过程
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(4, 3, 1, 2))
val value1 = value.sortBy(num => num)
value1.collect().foreach(println(_))
最后结果
可以看到是升序排序的,如果想降序排序,就添加一个参数false
默认是true,按照升序排序
总结
今天总算将Value类型的算子讲完了,明天会将双Value类型的算子讲解完毕。
今年是大年初三,过年也要学习哦