这是MaxCompute有关SQL优化器原理的系列文章之一。我们会陆续推出SQL优化器有关优化规则和框架的其他文章。添加钉钉群“关系代数优化技术”(群号11719083)可以获取最新文章发布动态。
本文主要介绍MaxCompute Optimizer对Shuffle方面的相关优化。
1 简介
分布式系统中,Shuffle是重操作之一,直接影响到了SQL运行时的效率。Join、Aggregate等操作符都需要借助Shuffle操作符,确保相同数据分发到同一机器或Instance中,才可以进行Join、Aggregate操作。对于这些需要经常使用到Shuffle场景,如何减少Shuffle,删除一些不必要的Shuffle是提升性能的一个关键。
例如
select count(*), c1 from t group by c1;
假设t表如果是Hash Clustering Table[2],则GroupBy的计算全部可以本地化处理,不需要再次Shuffle,因为相同c1值已在同一台机器了。
Optimizer Plan
OdpsPhysicalProject(cnt=[$1], c1=[$0]): rowcount = 1.5, cumulative cost = {6.30 rows, 3.00 cpu, 34.40 io, 0.00 memory, 0.00 network}, id = 281
OdpsPhysicalSortedAggregate(group=[{0}], __agg_0=[COUNT()]): rowcount = 1.5, cumulative cost = {4.80 rows, 3.00 cpu, 26.40 io, 0.00 memory, 0.00 network}, id = 280
OdpsPhysicalTableScan(table=[[wlz_p_02.t_test, c1(1) {0}]]): rowcount = 3.0, cumulative cost = {3.30 rows, 0.00 cpu, 26.40 io, 0.00 memory, 0.00 network}, id = 206
只需要一个Map Task就可以完成整个Query执行。后续会详细介绍如何优化。
2 Shuffle优化原理
MaxCompute Optimizer是基于开源项目Calcite基础上搭建的一套Optimizer框架。Calcite提供了Volcano模型的Planner,MaxCompute Optimizer引入了Volcano模型中Enforcer机制[1]来优化Shuffle。
简单介绍下Enforcer概念,Enforcer是指操作符(算法,如Join的实现SortedMergeJoin、Aggregate的2Pass实现等)要求输入数据必须具有一些物理数据属性(Trait),如order、distribution等。
如SortedMergeJoin要求输入数据必须基于Join keys进行分布且有序,这些为了确保满足SortedMergeJoin算法的要求。如果数据不是按照Join Keys分布,则相同Key值的数据不在同一个Instance里,则无法达到Join的目的;如果数据不是基于Key值有序,则无法满足Sorted Merge的要求。简单讲就是一个具体算法决定了输入数据的要求。在分布式环境中,数据要求是通过Shuffle实现。而Shuffle数据特性,我们称之为Trait。
2.1 Enforce Rule
如何确保对于任何算法满足其数据特性Trait的要求?
MaxCompute Optimizer实现了一种叫Enforcer Rule,来保证只要任何操作符对其输入(Input)要求一个Trait,Enforcer Rule就会保证Input的操作符一定会提供这种特性的数据。
图1 Enforcer Rule
图1展现了Enforcer Rule的工作机制。
1)任何Operator(算法)会对Input产生一个Required Trait。如SortedMergeJoin,则要求每一路Input必须是基于Key的分布且有序,类似Trait(Hash(c1) sort(c1 asc))。这一步由Build Rule实现,即对于任何Operator当采用某种算法时,必须将Required Trait的要求下推给Input。如Join当生成SortedMergeJoin时,则SortedMergeJoin的Input必须带有Required Trait。
2)Enforcer Rule捕获Required Trait + Input的Pattern。即图1中Required + Input Operator模式。
3)Shuffle生成。
Required+Input方式处理Shuffle提供了三种可能性:
A)情况1:Input Operator不能确保数据具有Trait特性,则直接在Input输出后生成基于Trait的Shuffle。这样Parent给到的要求得到了满足。
B)情况2:Input Operator操作符已经具有了Required Trait的特性,则Shuffle就不需要添加,即达到了减少Shuffle的目的。Parent要求的Trait也得到了满足。
C)情况3:Input Operator可以保证数据的特性可以传递,则可以将Required Trait继续下推到自己的Input中。则Required Trait继续由Input来满足要求,而当前Input本身可以确保数据特性不会发生改变。这种情况下,Required Trait也得到了满足。
任何一个Operator都是采用上述三种策略进行处理,从而使得Required Trait可以从Root Operator一直下推。
举例:Required Trait + Filter
当一个Required Trait与Filter这样一个Pattern被捕获时,如何保证Required Trait得到满足呢?
如果将Filter理解为一个当输入数据传入给Filter Operator时,Filter Operator仅仅是将每一行的数据进行判断是否满足condition条件,如果不满足condition条件的行数,则不输出。所以发现Filter具有一个特性,即不会改变数据的输入特性。所以当Required Trait要求Filter具有这种特性时,可以有两种处理方式:
1)直接生成Shuffle。
2)将Required Trait继续要求Filter的Input保持这种特性。
思考:是否可以直接将Required Trait下推到Filter Input得到满足?
答:不可以。这里有两种选择,即Shuffle是在Filter之前还是之后生成,这些由Optimizer另外一个特性来决定,即CBO(Cost-based Optimizer),也就是由Cost来决定选择是1)还是2),因为继续下推给Input,也有几种情况,要么生成Shuffle,要么继续下推等,这时Shuffle生成的位置则由CBO控制。
2.2 Operator算法
当根据Optimizer生成的Plan真正运行Operator时,必须严格要求如何保证数据特性来实现。如Filter,理论上实现时,可以不保证数据和其输入数据的特性一致,如果是这样,则这些优化都无法实现。因为Operator实现的算法不满足要求。所以Optimizer与Runtime算法必须要求一致。
如SortedAggregate必须保证基于Group By key分布有序等。
3 应用场景
3.1简单case
假设T1是clustering Table[2]
select count(*), c1 from t group by c1;
图2 详细实现步骤
图2展示了Optimizer如何减少Shuffle的详细步骤。
1)Pull Trait。根据Input来获取可能会产生的Trait。
2)convert(input, Trait[hash])。Aggregate build成SortedAggregate,则要求Project基于Key hash且有序,即Trait[hash(c1) sort(c1 asc)]。
3)Enforcer处理Required Trait[hash(c1) sort(c1 asc)] 与Project Pattern,将Trait下推给TableScan。
4)Required Trait与TableScan发现Trait一致,则Shuffle不需要添加。从而达到了减少Shuffle的优化目的。
上述逻辑介绍了Optimizer如何一步一步达到减少Shuffle的目的。主要关键点是Operator算法要求Input 满足Trait以及Trait与Input Pattern如何满足Parent Operator的要求。
3.2 TPC-H
基于TPC-H,给出Q4的Shuffle优化例子。Q4特征: Join的输入分别是Table和Group。且Join Key是Table的Clustering key和Group by key。
Q4:
create table q4_result_xx as
select
o_orderpriority,
count(*) as order_count
from
tpch_orders o
join
(select
distinct l_orderkey
from
(
select
*
from
tpch_lineitem
where
l_commitdate < l_receiptdate
) tab1
) tab2
on tab2.l_orderkey = o.o_orderkey
where
o.o_orderdate >= '1993-07-01' and o.o_orderdate < '1993-10-01'
group by
o_orderpriority;
图3 MaxCompute Optimizer Plan
图3中显示SortedMergeJoin的两路输入都不存在Shuffle,同时Aggregate这路也没有Shuffle,相当于减少了3次Shuffle(Aggregate一次+Join两次)。
图4 优化Shuffle DAG
图5 Shuffle不优化DAG
优化Shuffle VS不优化之间差别在于TableScan由于已经基于Key分布,所以Aggregate和Join的Required Trait都可以下推到TableScan,而TableScan都是基于这些key的Clustering Table,Required Trait得到满足,从而Shuffle都不需要。优化Shuffle后的Plan,M1中执行了Aggregate以及Join整个操作,而不需要类似不优化方式,根据Shuffle将Aggregate和Join切分成不同Task进行处理。从性能上讲,Shuffle优化方式耗时54s,而不优化方式耗时121s,性能提升一倍。
4 总结
本文主要从Optimizer实现角度上详细讲解优化Shuffle的实现原理以及一些应用场景。Shuffle优化对性能提升有较大帮助,目前主要应用在Clustering Table上,详细性能测试对比可以见ATA Hash Clustering文章[2]。
索引
[1] Goetz Graefe. The Volcano Optimizer Generator: Extensibility and Efficient Search
[2] ODPS Hash Clustering支持 (内部公测功能)
有任何Optimizer相关问题和反馈,请加入“关系代数优化技术”群获取相关支持。