说明
- RDD中的元素按照key指定的分区规则进行分区。
- RDD中的元素必须是键值对类型。
- 如果原有的partitionRDD和现有的partitionRDD一致的话就不进行分区,否则会发生shuffle。
函数签名
代码示例(默认分区器)
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
// 按照key分区,所以数据必须是k-v键值对类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 2)
println("-------------------重新分区前--------------------")
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
println("-------------------重新分区后--------------------")
//按照哈希值进行分区
val newRDD: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(3))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
sc.stop()
代码实现(自定义分区器)
class MyPartitioner(partitions: Int) extends Partitioner {
// 分区数量
override def numPartitions: Int = partitions
// 分区逻辑 返回值为分区编号
override def getPartition(key: Any): Int = {
// 将key的类型转为String类型
val k: String = key.asInstanceOf[String]
if (k.startsWith("136")) 0
else if (k.startsWith("137")) 1
else if (k.startsWith("138")) 2
else 3
}
}
// 调用自定义分区器
val rdd: RDD[(String, String)] = sc.makeRDD(List(("13698624174", "河北"), ("13766887551", "广东"),
("13876543211", "上海"), ("17677885551", "河南")), 2)
println("-------------------重新分区前--------------------")
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
println("-------------------重新分区后--------------------")
val newRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner(4))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()