spark 之所以需要调优,一是代码执行效率低,二是经常 OOM
内存溢出
内存溢出无非两点:
1. Driver 内存不够
2. Executor 内存不够
Driver 内存不够无非两点:
1. 读取数据太大
2. 数据回传
Executor 内存不够无非两点:
1. map 类操作产生大量数据,包括 map、flatMap、filter、mapPartitions 等
2. shuffle 后产生数据倾斜
Executor 内存不够
有个通用的解决办法就是增加 Executor 内存
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
但这并不一定是最好的办法
map 过程产生大量对象
造成 Executor 内存溢出
解决思路是减少每个 task 的大小,从而减少每个 task 的输出;
具体做法是在 会产生大量对象的 map 操作前 添加 repartition(重新分区) 方法,分区成更小的块传入 map
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count() # 100 * 100000000 个对象,内存溢出 rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum() # 内存溢出 rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count() # 可执行
数据倾斜
参考我的博客 数据倾斜
standalone 模式资源分配不均
该模式下配置了
--total-executor-cores NUM (Total cores for all executors.) 集群 executor 核数
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). 每个 executor 内存
而没有配置
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode) 每个 executor 核数
假如各个 executor 核数不一样,核数多的 executor 执行的 task 就多,内存就容易溢出
解决方法是配置参数 --executor-cores,或者是在 spark 中配置 spark.executor.cores
在 RDD *用对象
rdd = sc.parallelize(range(100)) def myfunc(x): return x rdd.flatMap(lambda x: [('k', 'v') for _ in range(200000000)]).foreach(myfunc) # 每次生成一个 tuple 对象,内存溢出 rdd.flatMap(lambda x: ['k'+'v' for _ in range(2000000)]).count() # 无需生成新的 string 对象,可执行
tuple 为不可变对象,不过字符串也是可变对象
此条方法有待进一步验证
Driver 中需要读取大量数据
造成 Driver 内存溢出
解决思路是增加 Driver 内存,具体做法为设置参数
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
示例
from pyspark import SparkContext sc = SparkContext(master='yarn') rdd = sc.parallelize(range(300000000)) # spark-submit --master yarn-client --driver-memory 512M driver_oom.py 内存溢出 # spark-submit --master yarn-client --driver-memory 3G driver_oom.py 可以执行
collect
大量数据回传 Driver,造成内存溢出
解决思路是分区输出,具体做法是 foreach
rdd = sc.parallelize(range(100)) rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect() # 内存溢出 def func(x): print(x) rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分区输出
或者增加 Driver 内存
代码优化
mapPartitions
1. 批处理
2. 减少中间输出
用 mapPartitions 替代多个 map,减少 Executor 内存压力
from pyspark import SparkContext sc = SparkContext(master='yarn') data = range(10) rdd = sc.parallelize(data, 2) ##### map rdd.map(lambda x: x % 3).filter(lambda x : x>1 ).countByValue().values() # [3] ##### mapPartitions # 避免了中间 RDD 的产生,节约内存,防止 oom def myfunc(datas): # datas type is itertools.chain for data in datas: value = data % 3 if value > 1: yield value print rdd.mapPartitions(myfunc).countByValue().values() # [3] # spark-submit --master yarn-client mapVSmapPartitions.py python 只支持 client 模式
DataFrame 代替 RDD
任务被划分成多个 stage,在每个 stage 内部,RDD 是无法自动优化的,
rdd.map(lambda x: x+1).map(lambda x: x+1) == rdd.map(lambda x: x+2)
如上面两个操作是等价的,但是 RDD 并不会自动优化,
而 DataFrame 使用 sql 查询,自带 sql 优化器,可自动找到最优方案
broadcast join
在分布式计算中,数据跨节点移动是非常影响性能的,网络传输耗时,多次传输消耗内存,broadcast 在某些场景可以减少数据移动;
如 一个 小RDD 要和 一个 大RDD 进行 join 操作,常规情况下要互传 RDD,由于多个 task,故需多次传输, 【注意必须是有个小 RDD,否则这种做法意义不大,因为后面要遍历这个广播变量】
如果把 小RDD 变成 broadcast 变量,就不用传输 大RDD,把 broadcast(小RDD) 缓存在对应 Executor 上即可
对 大RDD 进行 map 操作,在 map 函数中调用 小RDD 的 value,遍历 小RDD
map(lambda x: i for i in smallRDD.value if x == i)
filter 之后再 join
就是所谓的谓词下推,在 sparkSQL 中会自动这么操作;
如果是自己操作 RDD,可以减少 shuffle 的数据量
cache and persist
缓存 RDD 既可以节省内存,也可以提高性能;
cahce 是缓存到内存,等同于 persist(Storage.MEMORY_ONLY),在内存不足时,这种缓存方式会丢失数据,再次使用时会重新计算;
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 在内存不足时会写到磁盘,避免重复,只是耗费一点 IO 时间
combineByKey
在 hadoop 中也有 combine,有 combine 比 没有combine 效率高;
比如 reduceByKey (combine操作) 就比 groupyByKey (非combine操作) 效率高
import time from pyspark import SparkContext sc = SparkContext(master='yarn') strs = list('abcd')*10000000 rdd = sc.parallelize(strs) time.clock() print rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).collect() # combinByKey 操作耗时少3.2s # print rdd.map(lambda x: (x, 1)).groupByKey().mapValues(sum).collect() # 非 combinByKey 操作耗时3.6s # 二者结果一样 print(time.clock()) strs = list('abcd')*10000000 for i in strs:i = (i, 1) # 6s,单机for循环做更少的事情,耗时更多
图解如下
参考资料:
https://blog.csdn.net/yhb315279058/article/details/51035631