今天主要跟大家分享一下spark 3.0在SQL方向上的一些优化工作。从spark 2.4开始,大概有超过一年半的时间。对于一个比较活跃的开源项目来说,这个时间是非常长的。所以里面包含了大量的这种功能增强,性能优化,等各方面的新的feature在里面。大概超过50%的相关的issue都是和SQL相关的。在SQL这个方向上主要做的工作,大概分成四个方面。第一方面是工具类的。就是说基于spark的一个开发者怎么去和spark交互,提供一些更多的工具。第二个是dynamic optimization。简单来说就是运行时的优化。在这里面,包含了几个重大的性能改进。第三个是在spark的catalyst优化器方面有很多新的改进。第四个是基础依赖的更新。主要在语言层面引入了一些新的支持和依赖。
Spark 3.0是一个时间跨度非常长的release,包含了非常多的社区的工作。统计下来有接近3400多个issue在spark 3.0里面进行了处理。针对这么多的issue,我们用spark 3.0的时候,需要考虑有哪些东西对于实际的生产环境可能有好处,有哪些新的特性。
总结下来,大概可以把在SQL方向上的这种大的改动分成七个部分,分属于上文中提到的四个类别。
第一部分是new explain format。当我们想去改进,去优化一个spark SQL的性能的时候,首先需要去了解SQL的查询计划大概是一个什么样子,有针对性的去进行这种SQL的重写,或其他的一些改进。前提就是我的查询计划可读性比较强,是非常容易去看的。
对于之前2.4的版本,可以通过explain SQL去展示。只不过是这种展示的方式看起来繁杂一点。我们可以看到针对于SQL,这么一个物理查询计划,是一个树状的结构。也是可以去看的,但是可读性相对来说不够好。
在3.0里面,针对查询计划的这种展示进行了一定的优化,以简要的格式展示。根据节点的编号,可以找到对应的更详细的信息。而且对于每一个节点展示的信息也做了一些归类和整理,整理成input,output,condition等。通过这种方式,用户可以更加清晰的看到整个的查询计划。
第二部分是all type of join hints。在spark 2.4只支持broadcast。而spark 3.0除了支持broadcast,还支持sort merge,shuffle hash和cartesian。
第三部分是adaptive query execution。社区为什么要去做它,最主要的原因就是说,对于一些查询计划,在运行时能够拿到更准确的数据统计信息,可以选择最优的这种计划,对数据进行处理,从而提升spark处理数据的性能。主要包括三种场景。第一种是调整reducer的数量,从而避免额外的内存和IO的开销。第二种是说,选择最合适的join的策略。第三种是说,针对倾斜数据,在join的时候提供更好的处理方式。上述场景都是自动的,根据运行时的情况,自动地收集相关的信息,然后去做判断。
怎么去动态的调整reducer的数量。在spark 2.4,默认指定partition数量,每一个partition经过shuffle之后,对应的要处理的数据的大小可能是不一样的。这是由数据本身的特性来决定的,它的分布可能本来就是不均衡的。
在spark 3.0中,在shuffle的时候,每一个partition有不同的数据量大小,需要把小的partition数据进行合并,给同一个reducer去处理,从而使得每一个reducer它所处理的数据量大小是相近的。
针对有数据倾斜的这种join,在spark 2.4中带来的主要的问题就是说,在处理最大的partition时,要花费很长的时间,影响整个join。
在spark 3.0中,有数据倾斜的join,比在spark 2.4中更快。如图所示,对于表A和表B,我把大表的数据做切分,小表的数据做全量的分发。第一个,满足join的语义要求。第二个,在倾斜的这些key上面,它是被切成多分,然后在多个task里面去处理。
第四部分是dynamic partitioning pruning。在join操作中,要避免读取不必要的partition。而dynamic filter能够避免读取不必要的partition。
如下图所示,在spark 2.4中,大表中的所有数据都被读取。
而在spark 3.0中,通过pushdown with dynamic filter,能够减少大表中需要被读取的数据量。
如下图所示,是一个dynamic partitioning pruning的例子。
第五部分是Enhanced nested column pruning & pushdown,是针对于这种嵌套的数据结构的支持。在spark 2.4里面,其实已经提供了部分的这种支持。如下图所示的表里面,有column 1和column 2,而后者是一个嵌套的数据结构,它里面有两个字段。比如说,我查询的时候只查了column 2里面的第1个字段。去访问这个数据的时候,我只需要把column 2的第1个字段拿出来就行了,而不需要把整个column 2都拿出来。但是在spark 2.4里面它的支持是有限的。就是说,只能穿透有限的几个算子,比如说LIMIT这种算子,对于其他的一些算子是没办法的。
而在spark 3.0里面,对这一块进行了进一步的优化,能够支持把column pruning推到穿透所有的算子。
另外一种场景,就是说filter过滤的条件是根据嵌套字段里面的某一个子字段去做过滤,是不是支持把过滤条件也推到table scan里面。在spark 2.4里面也是不能够完全支持的。
而在spark 3.0里面,针对嵌套字段的filter,也是一直可以往下推到具体访问数据的table scan里面。
第六部分是Improved aggregation code generation,针对aggregation扩件的一个优化。
就是说,在spark里面我们去支持这种扩件,但是扩件会有一个限制。针对每个方法,如果大于8000 Java bytecode,HotSpot编译器就rollback,放弃生成native code。所以,如果你的这种SQL比较复杂,可能会没办法利用到扩件的这种特性。
在spark 3.0里面,针对这种情况做一些优化。简单来说,把一个方法拆分成多个方法,从而避免碰到8000 Java bytecode的限制。
具体的例子如下图所示。
第七部分是New Scala and Java,针对新的语言版本的支持。支持了新的Java 11这个版本,以及Scala 2.12版本。
关键词:Spark 3.0,SQL性能改进,Interactions with developers,Dynamic optimizations,Catalyst improvements,Infrastructure updates
获取更多 Spark+AI SUMMIT 精彩演讲视频回放,立刻点击前往:
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月4日第一场<<
>>SPARK + AI SUMMIT 2020 中文精华版线上峰会 7月5日第二场<<