自适应查询执行AQE:在运行时加速SparkSQL

一、自适应查询执行AQE简介

关于自适应查询执行,在数据库领域早有充分研究。在Spark社区,最早在Spark 1.6版本就已经提出发展自适应执行(Adaptive Query Execution,下文简称AQE);到了Spark 2.x时代,Intel大数据团队进行了相应的原型开发和实践;到了Spark 3.0时代,Databricks和Intel一起为社区贡献了新的AQE。

什么是AQE呢?简单来说就是根据在运行时统计信息(runtime statistics)在查询执行的过程中进行动态(Dynamic)的查询优化。那么我们为什么需要AQE呢?在Spark 2.x时代,为了选择最佳执行计划,我们引入了CBO(Cost-based optimization),但是在一些场景下,效果非常不好,缺点明显,比如:

  • 统计信息过期或者缺失导致估计错误;
  • 收集统计信息代价较大(比如column histograms);;
  • 某些谓词使用自定义UDF导致无法预估;
  • 手动指定执行hint跟不上数据变化。

而在Spark 3.0时代,AQE完全基于精确的运行时统计信息进行优化,引入了一个基本的概念Query Stages,并且以Query Stage为粒度,进行运行时的优化,其工作原理如下所示:
自适应查询执行AQE:在运行时加速SparkSQL

整个AQE的工作原理以及流程为:

  1. 运行没有依赖的stage;
  2. 在一个stage完成时再依据新的统计信息优化剩余部分;
  3. 执行其他已经满足依赖的stage;
  4. 重复步骤(2)(3)直至所有stage执行完成。

二、Spark 3.0中主要的AQE特性

Spark 3.0中主要的AQE特性包括:

  • 动态合并shuffle分区;
  • 动态转换join策略;
  • 动态优化join中的数据倾斜。

(一)动态合并shuffle分区

Shuffle分区数量和大小对查询性能很关键。在Spark 3.0以前,Shuffle分区是一个固定值,存在着明显的缺点,如果分区过小会导致I/O低效、调度开销和任务启动开销,但是如果分区过大又会带来GC压力和溢写硬盘等问题。另一方面,在Spark 3.0之前,整个查询执行过程中使用统一的分区数,而在查询执行的不同阶段,数据规模会发生明显变化,如果保持统一的分区数,则大大降低了效率。基于以上,动态合并Shuffle分区是非常必要的。

AQE解决上面问题的具体做法是设置较大的初始分区数来满足整个查询执行过程中最大的分区数,并且在每个Query stage结束的时候按需自动合并分区,其具体的流程如下图所示:
自适应查询执行AQE:在运行时加速SparkSQL

具体来说,动态合并Shuffle分区的原理如下:

对于普通的Shuffle来说,没有自动合并的过程,每个MAP读取Shuffle后,会根据指定分区数进行分区,比如下图为5:
自适应查询执行AQE:在运行时加速SparkSQL

进行上图所示的分区后发现,REDUCE1和REDUCE5要处理的数据量明显高于其余三个REDUCE,而我们理想的情况下是每个REDUCE处理的数据量是相当的,所以AQE进行了动态合并分区,将相邻的小分区2,3,4进行合并,输出三个REDUCE,大大提高了后续的效率,如下图所示:

自适应查询执行AQE:在运行时加速SparkSQL

(二)动态转换join策略
在Spark中,我们希望当Join的某一边可以完全放入内存时,Spark选择Broadcast Hash Join,但是实际上会出现预估可能不够准确,导致本来可以优化为BHJ的没有被优化的情况,原因也很多,比如;

  • 统计信息不够准确;
  • 子查询太复杂;
  • 黑盒的谓词,比如自定义UDF。

对于以上问题,AQE的解决方法就是使用运行时数据大小重新选择执行计划,其整个流程与原理如下图所示:
自适应查询执行AQE:在运行时加速SparkSQL

(三)动态优化join中的数据倾斜

在Join中的数据倾斜会导致一系列的问题,比如性能下降、某一个task影响整个stage的运行等,处理数据量比较大的partitions时候还可能会出现溢写磁盘的情况。AQE针对上述问题使用运行时的统计信息自动优化查询执行,动态的发现倾斜数据的数量,并且把倾斜的分区分成更小的子分区来处理。其做法如下图所示:
自适应查询执行AQE:在运行时加速SparkSQL

具体来说其原理如下:
对于普通的sort merge join来说,没有倾斜优化,可能会造成某个Shuffle分区的数据数量明显高于其他分区,如下图中的PART.A0,这种情况会造成A0和B0的这个Join执行速度明显慢于其他的Join。

自适应查询执行AQE:在运行时加速SparkSQL

有了AQE之后,根据数据倾斜优化后的sort merge join,使用skew Shuffle reader,如下图所示将A0分成三个子分区,并将对应的B0复制三份,整个Join任务的运行效率大大提升。

自适应查询执行AQE:在运行时加速SparkSQL

上述的几个特性可以在Demo中查看https://docs.databricks.com/_static/notebooks/aqe-demo.html

三、TPC-DS性能测试

进行TPC-DS性能测试的集群配置如下图所示:
自适应查询执行AQE:在运行时加速SparkSQL

测试结果显示,2条Query获得了1.5倍的性能提升,37条Query获得了1.1倍的性能提升。

自适应查询执行AQE:在运行时加速SparkSQL

下面两张图是关于分区合并和Join策略的性能测试结果,可以看出AQE对于性能的提升还是非常明显的。

自适应查询执行AQE:在运行时加速SparkSQL
自适应查询执行AQE:在运行时加速SparkSQL

除了在TPC-DS的测试中AQE表现优秀,在实际生产环境中AQE对于性能的提升也非常优秀,比如某电商公司分享在某些典型的倾斜查询中使用了AQE之后获得了十几倍的性能提升,某互联网巨头使用了AQE之后发现在2个典型的查询中性能分别有了5倍和1.38倍的提升等等。

四、Q&A

Q1:Shuffle是如何对大量小文件进行优化的?
A1:AQE 支持的动态分区合并可以减少 shuffle 后的分区数,如果是 ETL 作业写动态分区表,建议手动添加distribute by partkey 等子句来减少输出文件数量。

Q2:AQE是否支持外部的Shuffle Service?
A2:支持,需要 shuffle service 提供基本的统计信息

Q3:如果join的两边的part都比较大,是不是都会拆分?还会broadcast 么?
A3:都比较大的话优化就没啥用了,需要从业务出发进行优化。


关键词:Spark 3.0 、AQE、自适应查询执行、Join、小文件

获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<

上一篇:Django使用ldap认证登录


下一篇:kafka集群部署