一个Spark应用运行的过程如下所示:
- Driver
用户的主程序提交到Driver中执行,在Driver中创建SparkContext,SparkContext初始化DAGScheduler和TaskScheduler,作为coordinator负责从AppMaster申请资源,并将作业的Task调度到Executor上面执行。
在yarn-cluster模式下,AppMaster中包含了Driver,在YARN中启动,spark-submit客户端kill掉不影响程序的运行;
在yarn-client模式下,Driver在spark-submit的客户端启动(不在YARN中),跟AppMaster是分离的,spark-submit客户端kill掉会导致Spark程序挂掉(如spark-sql/spark-shell等都是以yarn-client的方式提交)
Executor上面运行的每个MapTask结束后都会有MapStatus汇报给Driver, 当MapTask数量非常多的时候可能会导致Driver出现OOM,此时需要调整Driver的内存大小,通过--conf spark.driver.memory=4G
或者--driver-memory 4G
来进行设置。
- Executor
实际执行Task的节点,Executor的个数由--conf spark.executor.instances=4
或者--num-executors 4
来设置;每个Executor里面并发跑的Task个数由--conf spark.executor.cores=2
或者--executor-cores
指定。
Executor的内存由--conf spark.executor.memory=4G
或者--executor-memory 4G
设置。
Spark内存管理
上面介绍了Spark中两个角色(Driver/Executor),其中Executor是实际运行Task的节点,Spark内存管理主要在Executor上面。
Executor内存使用结构
如上图所示, Spark on YARN模式下一个Executor的内存使用情况:
整个Executor是YARN的一个container,所以它的总内存受yarn.scheduler.maximum-allocation-mb
的参数控制;
当用户提交作业的时候通过spark.executor.memory
参数设置了executor的堆内存(heapsize),这部分内存的使用情况如上图所示:
- 系统预留(固定300MB)
详见SPARK-12081 - spark.memory.fraction
该参数控制executor内用户计算(execution)和存储(storage)总占用多少内存,即(M-R)*spark.memory.fraction
大小的内存; 剩余的(M-R)*(1-spark.memory.fraction)用于Spark内部的metadata以及用户数据结构等使用
对于spark.executor.memroyOverhead
,它是executor可额外使用的堆外(off-heap)内存,比如spark的shuffle过程使用的netty就会使用到堆外内存,如果程序有遇到相关的oom错误,可以尝试调大该参数。该内存不属于上面spark.executor.memory
(on-heap),但是它们的总和不能超过yarn.scheduler.maximum-allocation-mb
.
execution/storage内存管理
上图中execution/storage的内存((M-R)*spark.memroy.fraction
)是Task在executor中运行需要用到的内存,它们通过UnifiedMemoryManager
这个统一内存管理器来管理。
UnifiedMemoryManager
中的execution和storage的管理没有硬性的边界控制(比如execution固定占比多少),它们之间是一个软边界,初始的边界由spark.memory.storageFraction
来设置(默认0.5),但这个并不是一个固定的边界:
a) 当execution不够的时候,可以从storage侧借内存,如storage基本没使用(如没有cache数据等),execution可以从storage借内存甚至全部都借完,即使后续有storage需要用内存也不能强制从execution拿回,除非execution后续自己释放了部分内存,storage才能拿来使用;
b) 当storage不够的时候,如果execution有空闲多余的内存,则也可以借,但是后续如果execution又需要更多内存了则可以强制从storage拿回内存(如可以将storge的数据写到磁盘,然后释放对应的内存),直到storage使用的内存减少到spark.memory.storageFraction
的比例。
Task内存管理
一个Executor可以同时并发执行多个Task(通过spark.executor.cores
控制),而每个Task在运行的过程中都需要从Executor申请内存来使用,那Executor如何将内存分配给并发运行的多个Task呢? 这块留到下一篇文章来介绍。