关于Spark默认并行度spark.default.parallelism的理解

spark.default.parallelism是指RDD任务的默认并行度,Spark中所谓的并行度是指RDD中的分区数,即RDD中的Task数。

当初始RDD没有设置分区数(numPartitions或numSlice)时,则分区数采用spark.default.parallelism的取值。

Spark作业并行度的设置代码如下:

val conf = new SparkConf()
  .set("spark.default.parallelism", "500")

对于reduceByKey和join这些分布式shuffle算子操作,其reduce端的stage默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以map端的最后一个RDD的分区数作为其分区数,那么分区数就决定了reduce端的task的个数。

对于没有shuffle的算子,假设没有执行repartition和coalesce算子进行重分区,则分区个数不变,即RDD的分区数和父RDD的分区数相同。

对于没有父RDD的的算子,在创建RDD又没有设置分区数时,比如parallelize(或makeRDD),默认并行度依赖Spark运行的模式。

(1)local模式

默认并行度取决于本地机器的核数,即

  • local: 没有指定CPU核数,则所有计算都运行在一个线程当中,没有任何并行计算
  • local[K]:指定使用K个Core来运行计算,比如local[2]就是运行2个Core来执行
  • local[*]: 自动帮你按照CPU的核数来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算。

(2)集群模式

集群模式包含Stanalone、Yarn模式,Mesos的默认并行度为8

默认并行度取决于所有executor上的总核数与2的最大值,比如集群模式的设置如下:

--num-executors 5
--executor-cores 2

上面配置Executor的数量为5,每个Executor的CPU Core数量为2,executor上的总核数10,则默认并行度为Max(10,2)=10

注意,上面只是默认并行度(defaultParallelism)的取值,并不一定是RDD最终的分区数。具体来说,对于从集合中创建的RDD,其最终分区数等于defaultParallelism,但是从外部存储系统的数据集创建创建的RDD,其最终的分区数需要文件的总大小计算得到。

下面给出官方对于spark.default.parallelism的解释

Meaning(含义)

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

Default(默认值)

For distributed shuffle operations like reduceByKey and join, the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs, it depends on the cluster manager:

  • Local mode: number of cores on the local machine
  • Mesos fine grained mode: 8
  • Others: total number of cores on all executor nodes or 2, whichever is larger
上一篇:SparkMLlib的线性回归和逻辑回归


下一篇:Union