spark.default.parallelism
是指RDD任务的默认并行度,Spark中所谓的并行度是指RDD中的分区数,即RDD中的Task数。
当初始RDD没有设置分区数(numPartitions或numSlice)时,则分区数采用spark.default.parallelism
的取值。
Spark作业并行度的设置代码如下:
val conf = new SparkConf()
.set("spark.default.parallelism", "500")
对于reduceByKey和join这些分布式shuffle算子操作,其reduce端的stage默认取spark.default.parallelism
这个配置项的值作为分区数,如果没有配置,则以map端的最后一个RDD的分区数作为其分区数,那么分区数就决定了reduce端的task的个数。
对于没有shuffle的算子,假设没有执行repartition和coalesce算子进行重分区,则分区个数不变,即RDD的分区数和父RDD的分区数相同。
对于没有父RDD的的算子,在创建RDD又没有设置分区数时,比如parallelize(或makeRDD),默认并行度依赖Spark运行的模式。
(1)local模式
默认并行度取决于本地机器的核数,即
-
local
: 没有指定CPU核数,则所有计算都运行在一个线程当中,没有任何并行计算 -
local[K]
:指定使用K个Core来运行计算,比如local[2]
就是运行2个Core来执行 -
local[*]
: 自动帮你按照CPU的核数来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算。
(2)集群模式
集群模式包含Stanalone、Yarn模式,Mesos的默认并行度为8
默认并行度取决于所有executor上的总核数与2的最大值,比如集群模式的设置如下:
--num-executors 5
--executor-cores 2
上面配置Executor的数量为5,每个Executor的CPU Core数量为2,executor上的总核数10,则默认并行度为Max(10,2)=10
。
注意,上面只是默认并行度(defaultParallelism)的取值,并不一定是RDD最终的分区数。具体来说,对于从集合中创建的RDD,其最终分区数等于defaultParallelism,但是从外部存储系统的数据集创建创建的RDD,其最终的分区数需要文件的总大小计算得到。
下面给出官方对于spark.default.parallelism
的解释
Meaning(含义)
Default number of partitions in RDDs returned by transformations like join
, reduceByKey
, and parallelize
when not set by user.
Default(默认值)
For distributed shuffle operations like reduceByKey
and join
, the largest number of partitions in a parent RDD. For operations like parallelize
with no parent RDDs, it depends on the cluster manager:
- Local mode: number of cores on the local machine
- Mesos fine grained mode: 8
- Others: total number of cores on all executor nodes or 2, whichever is larger