spark的转换算子
- map:同分区有序运行,不同分区无序运行
- mapPartitions:一次性取数一个分区,在分区内计算
- golm:将一个分区的数据变成集合
- groupBy:讲数据源中的每一个数据进行key进行分
- filter:过滤,返回布尔类型
- sample
- distinct
- coalesce: 缩减分区
- repartition:其实就是(coalesce(shuffle true))
- sortBy: 会进行shuffle,默认不改变分区且升序
- 双value的操作,(交集,并集,差集,拉链)
- partitionBy:
- groupByKey:不给参数,自动获取第一个为key
- 补充:groupByKey 和 reduceByKey 的区别
- foldByKey:分区内和分区间的计算规则一样时使用
- 使用aggregateByKey求key的平均值
- 可以用combineByKey代替:
- 案例、不同省份的广告点击量前三的城市
转换算子
map:同分区有序运行,不同分区无序运行
(在各个分区内取数据,功能强大,但是效率低)(:/1c37ca978ea74eae9f8c4258b0f9064f)
val result1: RDD[Int] = rdd.map(num => {
println(num)
num * 2
})
mapPartitions:一次性取数一个分区,在分区内计算
效率高,但是只有全部计算完成的时候各分区的数据才会释放(因为对象的引用),内存小数据量大的场合下,可能会内存移除
val result2: RDD[Int] = rdd.mapPartitions(x => {
x.map(_ * 2)})
golm:将一个分区的数据变成集合
val glomRdd: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRdd.map(
data => data.max
)
println(maxRDD.collect().sum)
groupBy:讲数据源中的每一个数据进行key进行分
val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
def func1(string: String):Char = {
string(0)
}
rdd.groupBy(func1).collect().foreach(println)
filter:过滤,返回布尔类型
val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
rdd.filter(_.startsWith("s")).collect().foreach(println)
sample
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7), 2)
println(rdd.sample(
false, // 抽取完是否放回,true为放回
0.4, // 每一个数据被抽取的概率,就算大于1也不一定会被抽取
1 //随机数种子,可以不设置
).collect().mkString(","))
distinct
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 1, 3, 3, 4, 5, 6, 7), 2)
// case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
rdd.distinct().collect().foreach(print)
coalesce: 缩减分区
(不选择shuffle可能后续会数据倾斜)
repartition:其实就是(coalesce(shuffle true))
可以增加分区,调用的coalesce,但是默认给shuffle=true
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
rdd .coalesce(2, true).saveAsTextFile("out")
sortBy: 会进行shuffle,默认不改变分区且升序
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "2"), (3, "4"), (2, "6")), 2)
// 是经过shuffle的
rdd.sortBy(_._2,false)
.collect().foreach(println)
双value的操作,(交集,并集,差集,拉链)
intersection、union、subtract、zip
交集,并集,差集要求RDD的数据类型要一致;拉链可以不一致
// 交集
val rddInt: RDD[Int] = rdd1.intersection(rdd2)
// 并集
val rddUnion: RDD[Int] = rdd1.union(rdd2)
// 差集
val rddSub: RDD[Int] = rdd1.subtract(rdd2)
// 拉链
val rddZip: RDD[(Int, Int)] = rdd1.zip(rdd2)
partitionBy:
val rddNew: RDD[(Int, Int)] = rdd.map((_, 1))
// 自带hash分区,可重写
val result: RDD[(Int, Int)] = rddNew.partitionBy(new HashPartitioner(2))
result.saveAsTextFile("out")
groupByKey:不给参数,自动获取第一个为key
与groupBy的区别在于groupBy会保留完整的K-V对,groupByKey会提取出value
val GroupRDD: RDD[(Int, Iterable[(Int, String)])] = rdd.groupBy(_._1)
// (1,CompactBuffer((1,spark), (1,hello)))(3,CompactBuffer((3,scala)))(2,CompactBuffer((2,java)))
val GroupByRDD: RDD[(Int, Iterable[String])] = rdd.groupByKey()
// (1,CompactBuffer(spark, hello))(3,CompactBuffer(scala))(2,CompactBuffer(java))
补充:groupByKey 和 reduceByKey 的区别
shuffle角度:groupByKey是将数据打乱,会进行shuffle重新分区,可以再map进行统计
shuffle必须要落盘操作,不能在内存中等待
reduceByKey会在分区中预聚合(combine:类似MR的map端预聚合)。shuffle的数据就会变少一点;性能更高一点
功能角度:groupByKey是进行分组,再可以实现别的操作,功能更灵活,groupByKey是对相同key的value聚合,如果只是需要分组的话就不能使用
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
rdd.reduceByKey(_ + _).collect().foreach(println)
rdd.groupByKey().map({ case (word, iter) => (word, iter.sum) }).collect().foreach(println)
- aggregateByKey :两个参数列表。第一个参数列表为初始值;第二个参数列表两个参数:第一个参数为分区内操作,第二个参数为分区间操作
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 2)
rdd.aggregateByKey(0)(math.max(_,_),_+_).collect.foreach(println)
foldByKey:分区内和分区间的计算规则一样时使用
相当于第二个参数列表两个参数相同的aggregateByKey
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
rdd.foldByKey(0)(_ + _).collect.foreach(println)
使用aggregateByKey求key的平均值
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)
/**
* 初始值的(0,0) 第一个0是表示“值”,第二个0表示出现分区键“值”的次数,实际每一次的运算都是将初始值更新。
*/
val rdd1: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
// 第一次运算时:x代表的是(0,0),y是("a", 1)中的 1,运算的结果就是第一个0+y得到第一此运算的“和”,第二个0+1,得到运算的次数;结果为(1,1)
(init_tuple, value) => {
(init_tuple._1 + value, init_tuple._2 + 1)
},
// 各个分区内的数据进行运算,“值”和“值”相加,次数和次数相加
(part1, part2) => {
(part1._1 + part2._1, part1._2 + part2._2)
}
)
val rdd2: RDD[(String, Int)] = rdd1.mapValues({ case (x, y) => {
x / y
}
})
可以用combineByKey代替:
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)
/**
* 需要三个参数
* 第一个参数:将相同key的第一个数据进行结构转换
* 第二个参数:分区内的计算规则
* 第三个参数:分区间的计算规则
*/
val rdd2: RDD[(String, (Int, Int))] = rdd.combineByKey(x => (x, 1),
(init_tuple: (Int, Int, ), value) => {
(init_tuple._1 + value, init_tuple._2 + 1)
}
, (part1: (Int, Int), part2: (Int, Int)) => {
(part1._1 + part2._1, part1._2 + part2._2)
}
)
- join、leftOuterJoin、rightOuterJoin,cogroup(这个不发散,会将两个rdd中的数据变成两个迭代器iter)
案例、不同省份的广告点击量前三的城市
// 时间戳 省份 城市 用户 广告
// 1635304263 广东 深圳 yan2 N
val rdd: RDD[String] = sc.textFile("C:\\Users\\93134\\Desktop\\a.txt")
//1635304263 广东 深圳 yan2 N
val mapRdd: RDD[((String, String), Int)] = rdd.map(line => {
val datas: Array[String] = line.split(" ")
((datas(1), datas(2)), 1)
})
// ((江西,南昌),115)
val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)
// (江西,(南昌,115))
val mapRdd2: RDD[(String, (String, Int))] = reduceRdd.map({
// 不建议用_的方式,用模式匹配可以直接使用
case ((province, city), sum) => {
(province, (city, sum))
}
})
// (江西,CompactBuffer((江西,(吉安,104)), (江西,(赣州,121)), (江西,(南昌,115))))
val groupRdd: RDD[(String, Iterable[(String, Int)])] = mapRdd2.groupByKey()
// (江西,List((赣州,121), (南昌,115)))
val resultRdd: RDD[(String, List[(String, Int)])] = groupRdd.mapValues(values => {
values.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
})
resultRdd.collect().foreach(println)
sc.stop()