EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳。来自阿里云EMR团队的周克勇将详细介绍Native Codegen框架。本文整理自视频 https://developer.aliyun.com/live/43579
本次分享主要分为三部分,第一做这件事情的动机和背景,第二做的过程中解决的核心问题,最后是总结。
有些同学可能了解到,EMR团队今年4月份打破了大数据领域Benchmark TBCDS的世界纪录。在硬件完全相同的情况下,性能提升了一倍,从520万分提高到1100多万分。这个成绩的背后主要依赖两条技术,增强了Optimizer和Native Runtime。Optimizer层面,我们在之前工作的基础上,又做了诸如 CTE 、动态分区裁剪、小表广播、PK/FK、Fast Decimal等优化。
大家如果关注刚结束的SparkSummit,会发现一些类似的技术,如动态分区裁剪已经进了最新的Spark3.0,EMR版本的Spark在几个月之前就支持了。另一条技术是Native Runtime,也是今天分享的主题,涵盖的主要工作,包括 Native Codegen、统一内存布局、Batch化执行框架,后续会详细介绍。
大家都知道 Optimizer的目的是获取最好的执行计划,主要技术包括states收集和Cost Model,难点是静态states不够准确,无法在Plan阶段准确预知Filter或join之后的数据量,因此对后续Plan的代价评估不够准确。
今年SparkSummit发布的adaptive Execution,就把动态stats收集和plan优化结合在一起来解决这个问题。相对应的 Runtime的目的是针对选定的plan,如何使它跑得更快,长期以来 Runtime的主要工作基本上都聚焦在解决当下的新硬件瓶颈。如MapReduce刚出来时,网络带宽是瓶颈,所以Google做了很多locality方面的优化。Spark刚出来时解决的硬件瓶颈是磁盘I/O,它通过内存缓存来提升性能。
再后来 CPU成了新的瓶颈,我们可以看到从10年到20年,磁盘I/O和网络带宽都有了每年数量级的提升,但是CPU的主频基本上保持不变,因此CPU成了新的硬硬件瓶颈,提升CPU性能,成为近年来 Runtime领域重要的优化方向。优化CPU主要有两条技术路线,向量化和Codegen。我们先看一下传统的 SQL执行所应用的火山模型的问题所在,这是一个简单的Select家,Filter加Project加Agg的例子。
在执行的过程中,在火山模型中,每个算子都是一个迭代器,下游的算子,调上游算子的next方法,next返回当前算子处理之后的中间结果。这个模型最大的问题是每条record在经过每一个算子的时候,都要经过一次虚函数调用,而虚函数调用的开销是非常大的。
第二个问题就是在每个算子之间需要把中间的结果物化到内存。针对这个问题,向量化技术给出的解,是通过批量执行加列式存储,加小循环,来更好的利用 SIMD的指令和CPU的乱序执行,从而最大化数据并行度和指令并行度,从而分摊掉虚函数调用的开销,并提升执行性能。
例如上面例子里Agg 算子计算过程,他把输入 column1,column2以及 Agg的输出结果sum都存在数组里,然后通过一个很紧凑的for循环进行计算。由于循环足够简单,编译器会做循环展开和SIMD的优化。从截图中我们可以看到,编译器生成了很多向量化的指令,此外,由于for循环足够简单,然后for循环内部基本上都是访存指令,如访问colum1的第i个数据,colum2的第i个数据,所以每次放循环最主要的时间都是在进行访存,而因为 for循环足够的短,所以CPU的乱序执行的窗口里,可以同时发射多条漏斗指令,从而解决了 Memory Wall的问题。
这个技术的代表是MonetDB/X100(2005),以及今年SparkSummit宣布的 photon(2020)技术,主要的缺点是中间缓存的数据量比较大,Codegen技术的给出的解释算子融合,他打破了Stage内部算子间的界限,拼出来跟原来的逻辑保持一致的裸的代码通常是一个大的for循环,然后把拼成的代码编译成可执行文件,这里 面展示的跨越的第一个Stage拼出来的代码,可以看到最外层是一个大的for循环,接下来是Filter,表达了 Filter算术的语义,然后在Filter的内部是Agg的语意,拼出的代码完全不存在迭代器和额外的函数调用,就像是一个新手手写的代码,而这种代码不存在任何框架上的Overhead,性能往往是最好的。
Spark的Codegen把拼成的代码交给 Janino模块做编译,在运行的时候直接load即时编译出来的class文件。Codegen技术的好处有几点:
1.用for循环代替了迭代器,完全消除了虚函数调用;
2.没有了雾化,中间数据都保存在寄存器里。它的缺点就是因为 for循环比较大,而且每次迭代执行的逻辑非常的复杂,所以很难应用SIMD的优化。这个技术的代表是Hyper和Apache Spark,尽管Spark的Java Codegen,相比之前有了数量级的提升,但依然有一些不足。首先是Java的性能还是弱于Native Code,二是Java语义的限制,例如无法显示使用 SIMD或Prefetch之类,并且由于机器的存在,无法自主精细化控制内存。
3.NativeCode更容易跟新硬件进行交互。基于这个原因,我们决定使用 Native Runtime替换Java Runtime。同时我们不想对现有的Spark做太多的改动,所以最终我们选择了Codegen技术路线,结合起来就是Native Codegen。
接下来介绍我们做Native codegen解决的核心问题,集中在三个方面,我们要生成什么代码,怎么生成这些代码,以及怎么样跟Spark做集成。
第一个问题,生成什么?
如今的NativeCode有很多,C/C++。Go Rest,LLVM等。基于我们自己的技能点,其实可以选择的就只有C/C++, C++实现起来相对直观,只需要对照原来生成的Java代码,替换成C++即可。但C++最大的问题是它在编译时间过长,根据HyPer的论文,C++的编译时间比LLVM高出了一个数量级。LLVM的编译时间很短,而且执行的效率跟C++相当,看上去是一个很不错的选择。
其实很多Native Codegen这样的系统都选择了LLVM,包括HyPer,Impala以及阿里云自研的MaxComputer,ADB等,但LLVM对我们来说还是过于复杂,它的语法接近汇编,是想用汇编重写SQL算法的工作量会有多大,其实大多数引擎也不会用
LLVM写全量的代码,比如HyPer,解码算子的核心逻辑,用LLVM生成其他通用的功能,包括spill复杂数据结构的管理等,实际上是用C++提前编写好并进行编译。 即便如此,LLVM对我们来说依然过于复杂,在广泛调研之后,另外一种可能性出现了 Weld。
先介绍一下Weld
这个是Spark的作者matei的学生的作品,他提供了包括Language+Compiler+Runner的工具链,最终会转化成LLVM,然后用LLVM的工具链编译执行, Weld最初想解决的问题是不同lib之间相互调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算。
对于极度优化的Library来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld开发了Common Runtime,并配套提供了一组IR,再加上惰性求值的特性,只需要简单修改Library,使其符合Weld的语法规范,便可以做到不同Library共用Weld的Runtime,再利用惰性求值实现快Library的Pipeline,从而省去数据物化的开销。Weld Runtime还做了若干优化,如循环融合循环展开,向量化自适应执行等。
此外Weld支持调用C代码,可以方便的调用三方库。我们感兴趣的是Weld提供的IR和对应的Runtime。
Weld IR语法是针对关系代数进行设计的,非常适合表达SQL语句。数据结构层面,Weld IR最核心的数据结构是vec和struct,对应C语言里的数组和struct,能较好的表达Spark SQL的 Row Batch基于struct和vec,可以构造字典数据结构,能够比较好的表达SQL里面重度使用Hash结构,操作层面,Weld IR提供了类函数式语言的语义,如Map,Filter,Iterator等配合Builder语义,能方便地表达Project、Filter、Agg、Broadcast join等算子语义,例如 select加Filter的例子,用Weld IR的表达如下,第一行是函数签名,表示入参是一个数组,数组的元素是一个struct,strut包含两个int32的成员。
接下来就是一个大的 for表达式,跟常见的语法不同,for表达式包含三个参数
1.需要遍历的数组;
2.Build,用来生成最终的结果。 Build类型也决定了最终生成的结果的。用什么数据结构来存储。
3.lambda,用来定义针对每个元素的操作,在这个例子里面,第一个参数就是这个函数的入参v第二个参数是append,表示最终构造的结果,存在一个数组里面。第三个,lambda参数是一个if表达式, if的语义跟我们常见的也不太相同,它实际上是把 if的true和false的两个分支都作为参数表达,其中第一个参数是condition,第二个参数是当condition为true的时候,所执行的逻辑。
第三个参数肯定是认为false的时候执行的逻辑,在这个里面可以看到当第二个成员它是从0开始计数,当第二个成员大于10的时候,会把第一个成员 merge到 appender里面。否则的话就什么都不做,直接返回原来的build。Weld的IR。通过 weld_module_compile和weld_module_run,两个接口,分别做编译和执行。由于Weld同时兼顾了语法简洁,编译时间短的特性,因此我们选择Weld作为生成的目标。
第二个问题就是怎么生成?
我们复用了Spark Codegen框架。我们知道 Spark Codegen包含Expression和Stage两个级别,在Expression级别,我们对照原来的doGenCode()的接口,增加了doGenNativeCode(),里面拼出来的是Weld的语法,例如之前可能Java的代码里面就直接是两个变量的相加,然后改造了以后就成了一个struct的两个成员的相加。在WholeStage级别,我们复用了producer/ consumer的框架,熟悉Spark源码的同学应该了解到,在producer/consumer框架下,每个算子都提供了produce和consume接口,produce的职责是生成为下游提供数据的代码,consumer的职责是生成消费上游数据的代码,Spark 中并非所有的算子都支持Codegen,例如outjoin就不支持支持Codegen的算子,继承了CodeGenSupport的接口,我们对整个producer/consumer的框架并没有改动,在他们旁边又新增加了一系列的接口,包括 NativeCodeGenSupport/doProduceNative/doConsumeNative。
以一个具体的例子加以说明,还是一个相比较简单的select加Filter加Project的例子, query包含三个算子,Scan、Filter、Project。
然后 query他的代码生成的过程是右上角的这张图。首先 project就是最下游的算子,它的produce方法会返回最终生成的代码的字符串。然后它这个是怎么生成的呢?Project 的doProduce。直接调用了 Filter的doProduce方法,然后Filter的doProduce方法直接调用了Scan的doProduce,然后Scan的 doProduce会生成一个框架代码,在框架代码的内部会调用Scan的 doConsume。Scan的doConsume。直接调用Filter的doConsume。Filter的doConsume会生成Filter的逻辑,并在内部调用Project的doConsume,Project的 doConsume。会把最终的数据输出 append的到 output中。
我们看下面这三张图,Scan的 doProduce会生成for循环的一个架子。然后在for循环的每个迭代里面调用 Filter的doConsume方法, Filter的doConsume会生成一个if的表达式的框架,然后在判断为true,也就是if的内部的话,调用的是project的 doConsume。 最后project的doConsume拼成一段append 的方法把column1 append到 output里面到此为止。一个完整的Java的Codegen过程就结束了,然后我们就拿得到了直接可以编译的Java代码,当然这个是简化的过程。
对于Native Codegen的话,我们是复用了这个逻辑,只是把生成的Java代码替换成了Weld的IR,如底下三张图所示,具体的Weld,语法我就不详细展开了。
感兴趣的同学可以到Weld官网上看语法定义,代码生成还有一个问题就是Fallback机制,由于人力有限,我们无法覆盖所有的算子,因此需要实现Fallback机制。这里需要做的决定是应该做算子级别的Fallback,还是Stage级别的Fallback。直观上算子粒力度的Fallback好像更加合理,实际上却会导致更严重的问题。它会导致Stage内部Pipeline的断裂。前面讲了Codegen的一个优势是整个过程不存在物化,而算子力度的Fallback则会导致Stage内部一部分算是走Native Runtime,另一部分走Java Runtime,两者的连接数无可避免存在数据物化,开销通常会大于Native Runtime带来的收益。
基于这个原因,我们选择Stage级别的Fallback,一旦有任何算子不支持Native Codegen,在整个Stage都Fallback到Java Codegen,代码也已经生成了。
最后的问题,如何跟Spark集成 。
task的执行可以理解为一个黑盒,它的输入是Row Batch或者Row Iterator我们知道在Scan Stage Spark用了向量化读的优化,读出来的是列式存储的 column batch,每一列本质上都用一个数组进行存储,而在Shuffle Stage,Shuffle Fetch回来的数据结构是行式存储的 unsaferowbatch。每个Stage的输出会封装成会封装成Row Iterator。
我们前面讲到既然选择了Stage级别的Fallback,意味着黑盒要么是Java Runtime要么是Native Runtime,不存在混合的情况。因此我们关心的如何把输入转化为Weld认识的内存布局,以及如何把Weld的输出包装成Row Iterator。针对列存数据,打开offheap开关,数据天生就是指针数组,Weld可以直接操作。对于行存数据,主要问题是变长数据难以映射到 Weld的 struct右上方的图展示了Spark Row Batch的内存布局,首先是固定长度的,null bitmap,然后是固定长度的列数据,最后是变长数据,由于变长数据的存在,无法直接把一条record映射成 strut。
我们的做法是把定长部分和变长部分分别拷贝出来,并有offset和length来标志变量部分的位置和长度。这样一来record就能映射到strut结构了,而整个Row Batch就映射成了一个 vec strut。例如这个例子,每个record包括两个long和一个String,null bit用一个long表示,紧接着是两个long表示两个列的数据。第三个long保存变长数据的 offset和length,最后是变成部分,我们把变成拷贝出去之后,根据原先的offset和length,计算新的offset和length,最终我们用1个包含5个long的strut表示 record,分别是 null bitmap原先的两个long offset和length。这样一来我们就完成了统一内存布局,并且当且仅当有变长数据存在的时候才需要拷贝,否则的话是不需要拷贝的。Weld输出转换成Row Batch是刚才所说的过逆向过程,这里就不再赘述了,完成了数据转换,最后是Spark的执行流程。首先我们尝试走Native Codegen,若有异常发生,则切换到Java Codegen。若没有异常,则执行StageInit做初始化工作,包括初始化Weld,加载编译好的Weld module,拉取Broadcast数据等。
接着是一个循环,每个循环会读取一个Row Batch,给Native Runtime来执行,输出结果包装成Row Iterator,给Shuffle Write。以上就是EMR团队在Native Runtime上做的探索。总结下来,我们采用Weld的IR作为代码生成的目标语言,复用了Spark Codegen框架,进行代码生成,采用了Stage级别的Fallback机制,并通过统一内存布局跟Spark做了集成。
由于时间有限,一些工作没有包含在今天的分享中,例如 Weld的不好表达的算子,如SortMergejoin、Partitionby ,我们其实也用了Native的技术进行了优化。再例如 Weld本身的字典的实现效率比较低,我们也对此进行了比较大的优化。除了Native Runtime,EMR团队在Spark很多技术点都做了工作,欢迎大家交流沟通。