极客时间Spark性能调优实战-学习笔记(1)

通用性能调优(一)

一、应用开发三原则

原则一:使用spark自身的调优机制
充分利用 Spark 为我们提供的“性能红利”,如钨丝计划、AQE、SQL functions 等等。

钨丝计划的优势?
1)数据结构:采用紧凑的自定义二进制格式,存储效率高,避免的序列化反序列化。
2)开辟堆外内存来管理对象,对内存的估算更加精确,且避免了反复执行垃圾回收。
3)全阶段代码生成取代火山迭代模型。

AQE的优势?
AQE 可以让 Spark 在运行时的不同阶段,结合实时的运行时状态,周期性地动态调整前面的逻辑计划,然后根据再优化的逻辑计划,重新选定最优的物理计划,从而调整运行时后续阶段的执行方式。

1)自动分区合并
以filter和coalesce为例,当filter后,难免有些分区数据很少,需要我们手动通过coalesce合并。AQE会自动检测过小的分区,自动完成合并。
2)自动处理数据倾斜
自动加盐,完成数据倾斜的处理。
3)join策略自动调整
spark 3.0后可支持动态切换join方式,有一种情况是,其中一个表在排序前需要对数据进行过滤,过滤后的表小到足可以由广播变量容纳。针对这种情况,AQE 会根据运行时的统计数据,去动态地调整 Join 策略,把之前敲定的 Sort Merge Join 改为 Broadcast Join,从而改善应用的执行性能。

如何使用
钨丝计划:很简单,采用 DataFrame 或是 Dataset API 进行开发就可了。
AQE: 设置参数spark.sql.adaptive.enabled设置为true

原则二:能拖则拖
这个好理解,将shuffle操作尽量放在后面处理,因为一般来讲,程序越往后数据量是越少的(filter过滤),shuffle越靠后,自然要shuffle的数据量越少,效率越高。

原则三:跳出单机思维模式
我们需要时刻提醒自己RDD是一个分布式的数据集,我们操作的不是一个本地文件那么简单,一行简单的代码,哪怕你写的位置不一样,可能就会多执行几千万次,对性能是一个不小的开销。

二、硬件参数调优

1、CPU参数

主要包括spark.cores.max、spark.executor.cores 和 spark.task.cpus 这三个参数。分别对应整个集群资源可用的CPU,executor可用的cpu,task可用的cpu数。
CPU数代表最大可用的cpu个数,我们必须设置与此匹配的任务并行度来匹配。一般使用 spark.default.parallelism参数来控制。

2、内存参数
spark在管理模式上分为堆内内存和堆外内存,对外内存分Execution Memory 和 Storage Memory两部分。先把spark.memory.offHeap.enabled 置为 true,然后通过spark.memory.offHeap.size指定对外内存的大小。
堆内内存分了四个区域,也就是 Reserved Memory(预留空间300M)、User Memory(存储用户在driver端自定义的数据结构)、Execution Memory(执行分布式任务的内存) 和 Storage Memory(用于缓存等)(Execution Memory+Storage Memory=spark可用内存)。
极客时间Spark性能调优实战-学习笔记(1)堆外和堆内内存如何权衡?
对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用 JVM 堆内内存会更加稳妥,这是由于堆外内存采用紧凑的二进制存储格式决定的,若处理的数据集格式复杂,则会有局限性。

User Memory 与 Spark 可用内存如何分配?
spark.memory.fraction这个参数指定了spark中可用的内存比例,那么1-spark.memory.fraction则对应user memory的比例。如果代码中自定义的数据较多,那么可以给user memory多分配点内存,否则则应该将更多的内存给到spark可支配内存。

Execution Memory 该如何与 Storage Memory 平衡?
spark可用内存=Execution Memory+Storage Memory。那么这两部分的内存空间应该如何分配?
通常来说,在统一内存管理模式下,spark.memory.storageFraction 的设置就显得没那么紧要,因为无论这个参数设置多大,执行任务还是有机会抢占缓存内存,而且一旦完成抢占,就必须要等到任务执行结束才会释放。除非你的任务是类似机器学习之类的缓存密集型任务。

3、磁盘参数
spark.local.dir该参数指定的路径用于存放rdd cache和shuffle中间文件。有条件的可以用固态或者内存文件系统来提升IO效率。

三、shuffle类配置项
极客时间Spark性能调优实战-学习笔记(1)
通过以上两个参数可以调整map端和reduce端的缓冲区大小。

还有一个参数是spark.shuffle.sort.bypassMergeThreshold 。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序

四、sparkSQL大类配置项
AQE的三个特性:分区自动合并、数据倾斜自动处理、join策略动态调整。首先我们要通过sqrk.sql,adaptive.enabled来开启AQE。

1、分区自动合并
极客时间Spark性能调优实战-学习笔记(1)
第一个参数是默认开启的,第二个参数和第三个参数共同影响了分区合并的效果。怎么影响的呢?假设shuffle后数据的大小时20G。假设第三个参数我们设置的200,代表最小分区数,如果咱们的分区数太少也会影响我们的并发效率。那么20G/200=100MB。也就是每个分区的大小大概是100MB.然后我们将100MB和第二个参数设置的数值取一个最小值就是每个分区的目标尺寸。

2、数据倾斜自动处理
极客时间Spark性能调优实战-学习笔记(1)同样第一个参数默认开启。第三个参数是阈值,也就是说分区大小必须大于这个参数才有可能被判定为倾斜分区。但这还不够,还需要结合第二个参数来看。第二个参数什么意思?他是一个比例系数。举个例子,将所有分区大小进行排序,排序后取中位数,分区大小还必须大于中位数乘以这个比例系数才能判断为倾斜分区。也就是必须满足两个条件。第三个参数代表拆分力度。检测到的倾斜分区按照每个分区该参数设置的大小进行拆分。

3、join策略调整。我们指导join策略常用的有广播join,hash join mergejoin等。不同的策略开销也不一样。我们肯定需要选择一种合适的策略来提升join的性能。以前我们可以通过spark.sql.autoBroadcastJoinThreshold参数来设置广播join的阈值,如果有一张表的大小小于该值,便会使用广播join策略。
这种模式是定死的,无法动态平衡。而AQE 会在数据表完成过滤操作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。
极客时间Spark性能调优实战-学习笔记(1)不过非空分区比例要小于这个参数才会触发自动join策略调整

上一篇:React 函数式组件


下一篇:Learning Memory-guided Normality for Anomaly Detection论文解析