Spark版本:1.6.2
spark-submit提供了在所有集群平台提交应用的统一接口,你不需要因为平台的迁移改变配置。Spark支持三种集群:Standalone、Apache Mesos和Hadoop Yarn。
绑定应用程序依赖库
如果你的应用程序依赖其他项目,需要将其一起打包,打包时需要包括依赖的第三方库。sbt和maven都有装配插件,可以指定hadoop和spark版本,而不将其打入jar包中,因为hadoop和spark的库由集群环境提供。然后通过spark安装目录下的spark-submit工具提交你的应用程序。
对于python程序,需要添加--py-files参数,若有多个Python文件,推荐将其打包zip或egg,然后执行。
spark-submit提交应用
spark-submit支持对三种集群提交应用,主要语法如下:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
参数详解:
-
--class: 应用程序的入口,例如
org.apache.spark.example.SpariPi
- --master: 指定集群类型,例如local(本地)、spark://master:7077(stanalone模式)、yarn-client
- --deploy-mode: 是否将Driver部署到worker节点,默认是在client
- --conf: 配置spark环境,在引号中使用key=value形式
- appliaction-jar: 指定应用程序的jar包
- application-arguments: 应用程序的参数
还有一些针对各个集群平台的非通用的设置,例如使用Spark standalone cluster时,可以配置--supervise
参数,确保driver在返回值为非零时,自动重启。下面是一些常用的配置用例:
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000
Master URLs
下面对spark-submit的--master参数进行介绍,主要包含以下几种类型:
Master URL | 简介 |
---|---|
local | 在本地使用一个worker线程运行spark |
local[K] | 在本地使用K个worker线程运行spark |
local[*] | 在本地运行CPU核心数个worker线程 |
spark://HOST: PORT | 连接到Spark standalone集群的master节点,默认port是7077 |
mesos://HOST: PORT | 连接到mesos集群,默认port是5050 |
yarn | 连接到yarn集群,通过--deploy-mode指定yarn-client和yarn-cluster两种模式。集群的位置通过HADOOP_CONF_DIR或YARN_CONF_DIR变量配置 |
通过文件加载配置
Spark可以通过配置文件或应用代码、或者spark-submit参数加载相关的配置。默认情况下,spark读取conf/spark-defaults.conf配置。默认的spark配置参见下一文档。
若是通过代码设置spark.master参数,则--master参数会被忽略。一般来说,可以通过SparkConf配置的属性优先级最高,其次是spark-submit的属性,最后是配置文件。代码优先级 > spark-submit参数 > 配置文件。
先进的依赖管理
spark-submit的**--jars**选项会根据集群不同选择不同的处理策略。spark支持以下几种URL模式,并使用不同策略:
- file: 绝对的文件路径,各个worker通过http服务从driver节点copy文件;
- hdfs:http: https ftp: 通过相应的协议拉取jar文件到本地;
- local: 这种URL代表在每个worker的本地路径下都已经存在该文件,不会触发网络IO
由于每个worker都会拷贝文件到本地,如何清理是个问题。yarn会自动定期处理,spark standalone集群可以配置spark.worker.cleanup.appDataTtl
配置保存的时间,默认是7天。
用户还可以通过--packages
包含其他的依赖,这些依赖库的传播依赖也会被包含。--repositories
可以包含额外的库仓储。这些参数在pyspark, spark-shell, spark-submit
中都支持。
spark测试RDD所占存储
(获取部分记录,并根据RDD记录数对RDD所占空间进行预估):
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
} totalSize
} def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
} rddSize
}
更多信息
当部署好应用程序后,集群模式概述对分布式执行、如何监控和调试程序进行了阐述。