Apache Spark 3.0中的SQL性能改进概览

今天主要跟大家分享一下spark 3.0在SQL方向上的一些优化工作。从spark 2.4开始,大概有超过一年半的时间。对于一个比较活跃的开源项目来说,这个时间是非常长的。所以里面包含了大量的这种功能增强,性能优化,等各方面的新的feature在里面。大概超过50%的相关的issue都是和SQL相关的。在SQL这个方向上主要做的工作,大概分成四个方面。第一方面是工具类的。就是说基于spark的一个开发者怎么去和spark交互,提供一些更多的工具。第二个是dynamic optimization。简单来说就是运行时的优化。在这里面,包含了几个重大的性能改进。第三个是在spark的catalyst优化器方面有很多新的改进。第四个是基础依赖的更新。主要在语言层面引入了一些新的支持和依赖。

Apache Spark 3.0中的SQL性能改进概览

Spark 3.0是一个时间跨度非常长的release,包含了非常多的社区的工作。统计下来有接近3400多个issue在spark 3.0里面进行了处理。针对这么多的issue,我们用spark 3.0的时候,需要考虑有哪些东西对于实际的生产环境可能有好处,有哪些新的特性。

Apache Spark 3.0中的SQL性能改进概览

总结下来,大概可以把在SQL方向上的这种大的改动分成七个部分,分属于上文中提到的四个类别。

Apache Spark 3.0中的SQL性能改进概览

第一部分是new explain format。当我们想去改进,去优化一个spark SQL的性能的时候,首先需要去了解SQL的查询计划大概是一个什么样子,有针对性的去进行这种SQL的重写,或其他的一些改进。前提就是我的查询计划可读性比较强,是非常容易去看的。

Apache Spark 3.0中的SQL性能改进概览

对于之前2.4的版本,可以通过explain SQL去展示。只不过是这种展示的方式看起来繁杂一点。我们可以看到针对于SQL,这么一个物理查询计划,是一个树状的结构。也是可以去看的,但是可读性相对来说不够好。

Apache Spark 3.0中的SQL性能改进概览

在3.0里面,针对查询计划的这种展示进行了一定的优化,以简要的格式展示。根据节点的编号,可以找到对应的更详细的信息。而且对于每一个节点展示的信息也做了一些归类和整理,整理成input,output,condition等。通过这种方式,用户可以更加清晰的看到整个的查询计划。

Apache Spark 3.0中的SQL性能改进概览

第二部分是all type of join hints。在spark 2.4只支持broadcast。而spark 3.0除了支持broadcast,还支持sort merge,shuffle hash和cartesian。

Apache Spark 3.0中的SQL性能改进概览

第三部分是adaptive query execution。社区为什么要去做它,最主要的原因就是说,对于一些查询计划,在运行时能够拿到更准确的数据统计信息,可以选择最优的这种计划,对数据进行处理,从而提升spark处理数据的性能。主要包括三种场景。第一种是调整reducer的数量,从而避免额外的内存和IO的开销。第二种是说,选择最合适的join的策略。第三种是说,针对倾斜数据,在join的时候提供更好的处理方式。上述场景都是自动的,根据运行时的情况,自动地收集相关的信息,然后去做判断。

Apache Spark 3.0中的SQL性能改进概览

怎么去动态的调整reducer的数量。在spark 2.4,默认指定partition数量,每一个partition经过shuffle之后,对应的要处理的数据的大小可能是不一样的。这是由数据本身的特性来决定的,它的分布可能本来就是不均衡的。

Apache Spark 3.0中的SQL性能改进概览

在spark 3.0中,在shuffle的时候,每一个partition有不同的数据量大小,需要把小的partition数据进行合并,给同一个reducer去处理,从而使得每一个reducer它所处理的数据量大小是相近的。

Apache Spark 3.0中的SQL性能改进概览

针对有数据倾斜的这种join,在spark 2.4中带来的主要的问题就是说,在处理最大的partition时,要花费很长的时间,影响整个join。

Apache Spark 3.0中的SQL性能改进概览

在spark 3.0中,有数据倾斜的join,比在spark 2.4中更快。如图所示,对于表A和表B,我把大表的数据做切分,小表的数据做全量的分发。第一个,满足join的语义要求。第二个,在倾斜的这些key上面,它是被切成多分,然后在多个task里面去处理。

Apache Spark 3.0中的SQL性能改进概览

第四部分是dynamic partitioning pruning。在join操作中,要避免读取不必要的partition。而dynamic filter能够避免读取不必要的partition。

Apache Spark 3.0中的SQL性能改进概览

如下图所示,在spark 2.4中,大表中的所有数据都被读取。

Apache Spark 3.0中的SQL性能改进概览

而在spark 3.0中,通过pushdown with dynamic filter,能够减少大表中需要被读取的数据量。

Apache Spark 3.0中的SQL性能改进概览

如下图所示,是一个dynamic partitioning pruning的例子。

Apache Spark 3.0中的SQL性能改进概览

第五部分是Enhanced nested column pruning & pushdown,是针对于这种嵌套的数据结构的支持。在spark 2.4里面,其实已经提供了部分的这种支持。如下图所示的表里面,有column 1和column 2,而后者是一个嵌套的数据结构,它里面有两个字段。比如说,我查询的时候只查了column 2里面的第1个字段。去访问这个数据的时候,我只需要把column 2的第1个字段拿出来就行了,而不需要把整个column 2都拿出来。但是在spark 2.4里面它的支持是有限的。就是说,只能穿透有限的几个算子,比如说LIMIT这种算子,对于其他的一些算子是没办法的。

Apache Spark 3.0中的SQL性能改进概览

而在spark 3.0里面,对这一块进行了进一步的优化,能够支持把column pruning推到穿透所有的算子。

Apache Spark 3.0中的SQL性能改进概览

另外一种场景,就是说filter过滤的条件是根据嵌套字段里面的某一个子字段去做过滤,是不是支持把过滤条件也推到table scan里面。在spark 2.4里面也是不能够完全支持的。

Apache Spark 3.0中的SQL性能改进概览

而在spark 3.0里面,针对嵌套字段的filter,也是一直可以往下推到具体访问数据的table scan里面。

Apache Spark 3.0中的SQL性能改进概览

第六部分是Improved aggregation code generation,针对aggregation扩件的一个优化。

Apache Spark 3.0中的SQL性能改进概览

就是说,在spark里面我们去支持这种扩件,但是扩件会有一个限制。针对每个方法,如果大于8000 Java bytecode,HotSpot编译器就rollback,放弃生成native code。所以,如果你的这种SQL比较复杂,可能会没办法利用到扩件的这种特性。

Apache Spark 3.0中的SQL性能改进概览

在spark 3.0里面,针对这种情况做一些优化。简单来说,把一个方法拆分成多个方法,从而避免碰到8000 Java bytecode的限制。

Apache Spark 3.0中的SQL性能改进概览

具体的例子如下图所示。

Apache Spark 3.0中的SQL性能改进概览

第七部分是New Scala and Java,针对新的语言版本的支持。支持了新的Java 11这个版本,以及Scala 2.12版本。

Apache Spark 3.0中的SQL性能改进概览


关键词:Spark 3.0,SQL性能改进,Interactions with developers,Dynamic optimizations,Catalyst improvements,Infrastructure updates

获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<

上一篇:(原创)优酷android客户端 下载中 bug 解决


下一篇:Apache 重定向配置方法