package com.shujia.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} object Demo13Patition { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("partition") .set("spark.default.parallelism", "23") //shuffle 之后默认并行度 val sc = new SparkContext(conf) /** * 分区生产规则 * 1、默认一个block对应一个分区, 一个task处理128M的数据 * 2、可以设置最小分区数,实际分区数会根据文件数量进行计算,保证文件能被分开 * 3、如果block的数量比最小分区数大,以block数量为准 */ val linesRDD: RDD[String] = sc.textFile("data/words", 2) println("linesRDD分区数据:" + linesRDD.getNumPartitions) //没有shuffle算子生成的rdd分区数等于上一个rdd的分区数 val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(",")) println("wordsRDD分区数:" + wordsRDD.getNumPartitions) /** * shuufle 之后rdd分区数 * 1、如果不指定默认等于前一个rdd分区数 * 2、可以手动执行分区数 (numPartitions) * 3、设置默认并行度spark.default.parallelism * * 优先级 * 手动指定---> spark.default.parallelism ---> 前一个rdd分区数 * * * 分区数越多--> task 越多---> 计算并行度越高---> 任务越快( 导致产生很多小文件,浪费计算资源) */ val groupByRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy((w: String) => w, 100) println("groupBy分区数:" + groupByRDD.getNumPartitions) //groupByRDD.foreach(println) val myParttionRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy((w: String) => w, new MyPartition) println("myParttionRDD分区数:" + myParttionRDD.getNumPartitions) /** * repartition: 没有实际的业务逻辑,只是修改rdd分区数据,但是会产生shuffle * repartition : 既可以提高分区也可以减少分区 * * coalesce: 修改分区数据,如果不产生shuufle ,不能用于提高分区数据 * * coalesce(shuffle=false): 一般用于合并小文件,不产生shuffle ,效率高 * */ val rePartitionRDD: RDD[(String, Iterable[String])] = myParttionRDD.repartition(1000) println("rePartitionRDD分区数据:" + rePartitionRDD.getNumPartitions) val coalesceRDD: RDD[(String, Iterable[String])] = rePartitionRDD.coalesce(10, false) println("coalesceRDD分区数:" + coalesceRDD.getNumPartitions) /* while (true) { }*/ } } /** * 自定义分区,默认是hash分区 * */ class MyPartition extends Partitioner { //指定rdd分区数 override def numPartitions: Int = 100 /** * spark 在shuffle的时候会调用这个方法来获取分区数 * */ override def getPartition(key: Any): Int = { ///hash 分区 math.abs(key.hashCode()) % numPartitions } }
自定义分区
package com.shujia.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} object Demo14StudentPatition { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("partition") val sc = new SparkContext(conf) val student: RDD[String] = sc.textFile("data/students.txt", 2) println("student的分区数:" + student.getNumPartitions) /** * * 将文科和理科分别保存到不同的文件中 * 就要分到不同的reduce中去 * 一个reduce会生成一个文件 * * */ val clazzPationRDD: RDD[(String, Iterable[String])] = student.groupBy((stu: String) => stu,new ClassPartition) println("clazzPationRDD的分区数:"+clazzPationRDD.getNumPartitions) clazzPationRDD.map(_._1).saveAsTextFile("data/clazz") } } class ClassPartition extends Partitioner{ override def numPartitions: Int = 2 override def getPartition(key: Any): Int = { val clazz: String = key.toString.split(",")(4) if (clazz.startsWith("文科")){ 0 }else{ 1 } } }