==> mapPartitionsWithIndex
---> 定义: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)
---> 作用: 对 RDD 每个分区进行操作,带有分区号
---> 示例:输出分区号和内容
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// 创建一个RDD val rdd 1 = sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ))
// 创建一个函数,作为 f 的值 def func(index : Int, iter : Iterator[Int]) : Iterator[String] = {
iter.toList.map(x = > "[PartID: " + index + ", value= " + x + "]" ).iterator
} // 调用 rdd 1 .mapPartitionsWithIndex(func).colect
// 结果 res 15 : Array[String] = Array([PartitionID : 0 ,value = 1 ], [PartitionID : 0 ,value = 2 ], [PartitionID : 0 ,value = 3 ], [PartitionID : 0 ,value = 4 ],
[PartitionID : 1 ,value = 5 ], [PartitionID : 1 ,value = 6 ], [PartitionID : 1 ,value = 7 ], [PartitionID : 1 ,value = 8 ], [PartitionID : 1 ,value = 9 ])
|
==> aggregate
---> 定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) => U, combOp: (U, U) => U): U
---- (zeroValue: U) 初始值
---- seqOp:(U, T) => U 局部操作
---- combOp:(U, U) => U 全局操作
---> 作用:先对局部进行操作,再对全局进行操作
---> 示例:
1
2
3
4
|
// 求两个分区最大值的和,初始值为0 val rdd 1 = sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ))
rdd 1 .aggregate( 0 )(math.max( _ , _ ), _ + _ )
// 结果为:res16: Int = 13 |
==> aggregateByKey
---> 定义:
---> 作用:对 key-value 格式 的数据进行 aggregate 操作
---> 示例:
1
2
3
4
5
6
7
8
9
10
11
|
// 准备一个 key-value 格式的 RDD val parRDD = sc.parallelize(List(( "cat" , 2 ),( "cat" , 5 ),( "mouse" , 4 ),( "cat" , 12 ),( "dog" , 12 ),( "mouse" , 2 )), 2 )
// 计算每个分区中的动物最多的个数求和 parRDD.aggregateByKey( 0 )(math.max( _ , _ ), _ + _ )
// 结果为: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) // 计算每种动物的总数量 parRDD.aggregateByKey( 0 )( _ + _ , _ + _ ).collect // 方法一
parRDD.reduceByKey( _ + _ ).collect
|
==> coalesce 与 repartition
---> 作用:将 RDD 中的分区进行重分区
---> 区别: coalesce 默认不会进行 shuffle(false)
repartition 会进行 shuffle(true), 会将数据真正通过网络进行重分区
---> 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 定义一个 RDD val rdd = sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ), 2 )
// 显示分区中的分区号和分区号中的内容 def func(index : Int, iter : Iterator[Int]) : Iterator[String] = {
iter.toList.map(x = > "[PartID: " + index + ", value= " + x + "]" ).iterator
} // 查看 rdd 中的分区情况 rdd.mapPartitionsWithIndex(func).collect // 结果为: Array[String] = Array( // [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], // [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8]) // 使用 repartition 将分区数改为3 val rdd 2 = rdd 1 .repartition( 3 )
val rdd 3 = rdd 1 .coalesce( 3 , true )
// 查看rdd2 与rdd3 的分区情况 rdd 2 .mapPartitionsWithIndex(func).collect
rdd 3 .mapPartitionsWithIndex(func).collect
// 结果为:Array[String] = Array( // [PartID: 0, value= 3], [PartID: 0, value= 6], // [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], // [PartID: 2, value= 2], [PartID: 2, value= 5]) |