文章目录
Spark之所以能进行高性能的查询计算,主要得益于其基于内存的计算模型,那么在讨论Spark 中的一系列OOM error之前,我们应该对Spark的内存模型有一个充分的了解(请参考: Spark内存模型),才能分析定位OOM出现的原因。Spark内存模型图贴在这里便于参考:
了解了Spark的内存模型之后,我们回到Spark OOM error上。当Spark应用程序因为OOM error中断时,我们大概率会在报错中看到诸如下面这样的信息:
java.lang.OutOfMemoryError: Java heap space
首先,我们要清楚的是根据Spark的运行架构可知,Spark OOM主要发生在两个地方:Driver端和Executor端。
Driver端OOM Error
Driver端内存是通过配置“spark.driver.memory”来指定的,默认1g。注:在client模式下,此配置不能在应用程序中通过SparkConf设定,因为在SparkConf初始化之前,Driver JVM已经启动了。这时,可以通过在spark-submit命令行指定–driver-memory来设定,或者配置到Spark的配置文件(conf/spark_default.conf)中。
通常情况下,导致Driver端OOM的原因有以下两种:
1. 不适合的API调用
比如,在大数据集(RDD/Dataset/DataFrame)上调用了collect()、collectAsList()、take()、takeAsList()等方法。这些方法会将所有要返回的数据从所有的Executor上发送到Driver端,如果返回的数据超过了Driver内存,就会引发OOM Error。
针对这种情况,我们有以下3中解决方法:
- 最简单粗暴的就是增加Driver端内存。
- 在RDD/Dataset上调用repartition()方法,将数据交给Executor上的一个任务处理。因为一般来讲,Executor会设置较多的内存。
- 可以设置“spark.driver.maxResultSize”(默认1g)来避免Driver出现OOM errors。
2. 广播了大变量
将RDD/Dataset进行广播时,会先将它们发送到Driver,然后由Driver端分发给每个Executor,如果我们的应用程序中广播了多个大变量,超出了Driver内存限制,就可能造成OOM Error。
针对这种情况,我们有以下2中解决方法:
- 增加Driver端内存。
- 通过配置“spark.sql.autoBroadcastJoinThreshold”减小要广播的变量的大小。
Executor端OOM Error
Executor端内存是通过配置“spark.executor.memory”来指定的,默认1g。
Executor端OOM Error原因:
1. 低效的查询
比如,在列式存储格式的表上执行select * from t返回不必要的列;没有提前使用过滤条件过滤掉不必要的数据。
所以,我们在查询的时候,要尽量将过滤条件前置;要尽量只读需要的列;分区表上只查询指定分区的数据等等。
2. 不合适的Driver端和Executor端内存
每个Spark应用程序所需的内存都是不一样的,因此我们要结合所处理的数据的大小和应用监控(比如,Spark Web UI)来合理的为每个应用配置合适的Driver端和Executor端内存大小。
3. 不合适的YARN Container内存
当我们的Spark任务运行在YARN上的时候,可能会因为container内存负载造成OOM Error,出现类似如下的报错:
......is running beyond physical memory limits. Current usage: xxxGB of xxxGB physical memory used; xxxGB of xxxGB virtual memory used. Killing container.....
这种情况我们需要给配置spark.yarn.executor.memoryOverhead(Spark 2.3.0之前)或spark.executor.memoryOverhead(Spark 2.3.0及之后)设置一个合适的值。此配置代表分配给每个Executor的非堆内存空间,这些内存空间主要用于VM开销、interned strings以及其他本地开销,默认值为executorMemory * 0.10,最小不能低于384MB。
4. 内存中缓存大量数据
如果Spark应用程序指定在内存中缓存了大的RDD/Dataset,或者是缓存了较多的RDD/Dataset,就有可能发生OOM Error:
val df: Dataset[Row] = spark.read.table("")
df.persist(StorageLevel.MEMORY_ONLY) //Dataset全部缓存到内存中
val rdd: RDD[Int] = ...
rdd.cache() //RDD全部缓存到内存中
Spark内存模型中,Execution内存和Storage内存的总大小为0.6*(heap space - 300MB),这总大小里面两者默认各占一半,我们应用中缓存的哪些数据集正是存储在Storage内存区域的。如果Spark应用程序中计算较少,那么我们可以通过spark.memory.storage适当调大Storage内存,或者通过配置spark.memory.fraction整体调大两者的总内存。
5. 不合适任务并行度
假如我们的Spark应用程序中要读取的表或文件数据量比较大,而我们没有配置合适的内存大小,并且分配了比较高的并发度和CPU核数,这些数据要在内存中计算,这就可能会造成Executor端OOM。
如果任务并行度较低,而某个任务又被分配了较多的数据,也会造成OOM。
参考
- https://spark.apache.org/docs/latest/configuration.html
- https://spark.apache.org/docs/2.2.0/running-on-yarn.html
- https://unraveldata.com/common-reasons-spark-applications-slow-fail-part-1/
- https://medium.com/swlh/spark-oom-error-closeup-462c7a01709d