背景
EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。为了实现高效地预计算结果复用,我们构建的预计算缓存一般都较为通用,因此对于用户query,还需进行进一步的计算方能获得最终结果。因此,如何快速地找出匹配的缓存,并构建出准确的新执行计划,就显得尤为重要。
在Hive 3.x中支持的Materialized View,利用了Apache Calcite对执行计划进行重写。考虑到Spark SQL使用Catalyst进行执行计划优化,引入Calcite太重,因此EMR Spark中的Relational Cache实现了自己的Catalyst规则,用于重写执行计划。本文将介绍执行计划重写的相关内容。
执行计划重写
准备工作
Spark会把用户查询语句进行解析,依次转化为Unresolved Logical Plan(未绑定的逻辑计划)、Resolved Logical Plan(绑定的逻辑计划)、Optimized Logical Plan(优化的逻辑计划)、Physical Plan(物理计划)。其中,未优化的逻辑计划根据用户查询语句不同,会有较大区别,而Relational Cache作为优化的一部分,放在逻辑计划优化过程中也较为合适,因此我们拿到的用户查询计划会是优化中的逻辑计划。要与优化中的逻辑计划匹配,我们选择把这个重写过程放在Spark优化器比较靠后的步骤中,同时,预先将Relational Cache的逻辑计划进行解析,获得优化后的Cache计划,减小匹配时的复杂程度。这样,我们只需匹配做完了谓词下推、谓词合并等等优化之后的两个逻辑计划。
基本过程
在匹配时,我们希望能尽可能多得匹配计算和IO操作,因此,我们对目标计划进行前序遍历,依次进行匹配,尝试找到最多的匹配节点。而在判断两个节点是否匹配时,我们采用后序遍历的方式,希望尽快发现不匹配的情况,减少计划匹配的执行时间。然后我们会根据匹配结果,对计划进行重写,包括对于Cache数据进行进一步的Filter、Project、Sort甚至Aggregate等操作,使其与匹配节点完全等价,然后更新逻辑计划节点的引用绑定,无缝替换到逻辑计划中,这样就能轻松获得最终的重写后的计划。
Join匹配
Spark中的Join都是二元操作,而实际的Join顺序可能根据一些策略会有很大区别,因此对于Join节点,必须进行特殊处理。我们会首先将逻辑计划进行处理,根据缓存计划的Join顺序进行Join重排。这一步在树状匹配之前就进行了,避免不断重复Join重排带来的时间浪费。重排后的Join可以更大概率地被我们匹配到。
为了实现Cache的通用性,根据星型数据模型的特点,我们引入了Record Preserve的概念。这和传统数据库中的Primary Key/Foreign Key的关系较为类似,当有主键的表与非空外键指向的表在外键上进行Join时,记录的条数不会变化,不会膨胀某条记录,也不会丢失某条记录。PK/FK的语意在大数据处理框架中经常缺失,我们引入了新的DDL让用户自定义Record Preserve Join的关系。当用户定义A Inner Join B是对于A表Record Preserve时,我们也会把A Inner Join B和A的关系匹配起来。有了PK/FK的帮助,我们能匹配上的情况大大增加了,一个Relational Cache可以被更多看似区别巨大的查询共享,这可以很好的为用户节约额外的存储开销和预计算开销。
Aggregate匹配
一般的Aggregate匹配较为简单,而Spark支持的Grouping Set操作,会构建出Expand逻辑计划节点,相当于把一条记录转为多条,使用Grouping ID进行标记。由于Expand的子节点是所有Grouping的情况共用的,这里我们只对子节点进行一次匹配,再分别进行上面的Grouping属性和Aggregate属性的匹配。主要是验证目标聚合所需的属性或者聚合函数都能从某个Grouping ID对应的聚合结果中计算出来,比如粗粒度的Sum可以对细粒度的Sum进行二次Sum求和,而粗粒度的Count对细粒度的Count也应通过二次Sum求和,粗粒度的Average无法仅从细粒度的Average中还原出来等等。
计划重写
找出匹配的逻辑计划之后,就是重写逻辑计划的过程。对于无需二次聚合的逻辑计划,直接根据缓存数据的schema,从缓存数据的Relation中选择所需列,根据条件过滤后,进行后续操作。如果还需二次聚合,选择所需列时需保留外部要用的所有列,以及聚合时需要的列,还有聚合函数需要的数据。二次聚合的聚合函数需要根据实际情况进行重写,确保能使用Relational Cache中已经初步聚合的结果。这里面需要根据聚合的语意判断是否能够二次聚合。如果时Grouping Set的聚合,二次聚合之前还需选择正确的Grouping ID进行过滤。经过二次聚合后,步骤大体和普通的重写一致,只需替换到目标计划中即可。
结果
我们以一个例子来具体说明逻辑计划的重写结果。Star Schema Benchmark(论文链接)是星型模型数据分析的一个标准Benchmark,其结构定义如图所示:
我们构建Relational Cache的SQL语句如下:
SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
FROM supplier, p_lineorder, dates, customer, part
WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
我们从中选出一条查询作为示例。具体查询语句:
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from customer, lineorder, supplier, dates
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and c_nation = 'UNITED KINGDOM'
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and s_nation = 'UNITED KINGDOM'
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc
原始逻辑计划如下所示:
Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
+- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
+- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
+- Join Inner, (lo_orderdate#16 = d_datekey#35)
:- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
: +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
: :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
: : +- Join Inner, (lo_custkey#13 = c_custkey#3)
: : :- Project [c_custkey#3, c_city#6]
: : : +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
: : : +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
: : +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
: : +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
: : +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
: +- Project [s_suppkey#28, s_city#31]
: +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
: +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
+- Project [d_datekey#35, d_year#39]
+- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
+- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]
重写后的一个逻辑计划如下:
Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
+- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
+- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
由此可见,执行计划大大简化,我们可以做到亚秒级响应用户的命中查询。
进一步优化
在实际测试过程中,我们发现当多个Relational Cache存在时,匹配时间线性增长明显。由于我们在metastore中存储的是Cache的SQL语句,取SQL语句和再次解析的时间都不容小觑,这就使得匹配过程明显增长,背离了我们追求亚秒级响应的初衷。因此我们在Spark中构建了逻辑计划缓存,将解析过的Relational Cache的计划缓存在内存中,每个Relational Cache只缓存一份,计划本身占用空间有限,因此我们可以缓存住几乎所有的Relational Cache的优化后的逻辑计划,从而在第一次查询之后,所有查询都不再收到取SQL语句和再次解析的延迟困扰。经过这样的优化,匹配时间大幅减少到100ms的量级。
总结与思考
Relational Cache实现了一种基于Cache的优化方案,让Spark SQL能够用于即时查询的场景下,满足用户对海量数据秒级查询的需求。通过对用户查询的动态改写,可以大大提高缓存的利用率,扩展缓存的命中场景,有效提高查询性能。现有方案也有很多可优化的地方,比如重复的回溯遍历时间复杂度较高,不如在逻辑计划节点内部更新维护可匹配的信息。考虑到对Spark的侵入性,我们暂时选择了现有方案,后续根据实际的使用情况,还会进一步优化我们的逻辑计划重写过程。而重写的逻辑计划还涉及到基于不同的Relational Cache Plan会有不同的重写方式,在这些重写结果中如何根据执行代价选择最优的重写方案,将会在后续文章中进行揭秘,敬请期待!