使用Relational Cache加速Spark数据分析
背景
Cache被广泛应用于数据处理的各个领域和方向上,在目前,计算速度远远大于IO访问速度依然是计算设备上最突出的矛盾,计算设备上的存储从HDD -> SSD -> NVMe -> Mem -> L3-L2-L1 Cache -> 寄存器 -> CPU,存储设备距离CPU越近,计算和IO访问速度的差距越小,数据处理的速度越快,但同时存储从下到上,价格越来越贵,容量越来越小。Cache以更多的资源消耗为代价,将待处理数据预先推到离计算更近的位置,从而加速数据处理的速度,填补计算和IO访问速度的差距。对于Spark来说,HDFS cache,Alluxio等文件系统都提供了文件级别的Cache服务,通过将文件cache到内存中,加速数据处理的速度,并且对Spark这样的计算框架完全透明。
除此之外,还有另外一种Cache的思路,如果需要多次对同一数据进行处理,且处理逻辑有相通之处,我们可以把中间结果cache起来,这样每次进行数据处理时从中间结果进行处理,节省了从原始数据到中间结果之间的计算。Cache的数据离计算结果更近,相比原始数据,经过更少的计算就能得到结果,同样也会加速处理速度。数据仓库中的物化视图是这种cache类型的典型应用。
在Spark中,也提供了Dataset级别的Cache,用户可以通过SQL DDL或是Dataset API将带有schema信息的关系型数据(而非文件)cache到内存。基于Dataset后续的数据处理都可以通过直接读取cache在内存中的数据而节省计算Dataset的时间。不同于数据仓库中的物化视图,Spark目前的Dataset cache还存在很多的不足之处:
- Spark Cached Dataset只能在同一个Spark Context中重用,跨Spark Context无法共享,且当Spark Context退出后,cache数据也会被删除。
- Dataset Cache,只支持执行计划精确匹配重用,即只有后续查询的执行计划能够精确匹配cached dataset的执行计划,才能使用cache优化查询,这大大降低了cache的优化范围。
- Cache的Dataset数据只能保存在内存或本地磁盘,数据量较大时对内存需求较大,而持久化的数据是序列化二进制数据,没有数据schema信息,反序列化代价较大,而且无法支持project filter pushdown等SQL优化处理。
Relational Cache的设计
基于上面提到的缺点,Spark Dataset cache在实际应用中的使用并不广泛,也无法满足一些典型的交互式分析场景,比如基于星型模型多维数据的分析,一般是通过提前构建Cube,通过SQL执行计划重写,满足亚秒级的交互式分析需求。Relational Cache希望能够兼顾Spark Dataset Cache的易用性和物化视图的优化效率,主要的目标包括三个:
- 用户可以cache任意关系型数据,包括Table,View或是Dataset。对于任意关系型数据的cache支持可以大大扩展了Relational Cache的使用范围,任何包含重复计算或是可预先确定计算逻辑的使用场景都可能从Relational Cache获益,例如多维数据分析,报表,Dashboard,ETL等。
- cache数据支持存放在内存,本地磁盘或是任意Spark支持的Datasource中。存放在内存的临时cache数据访问速度非常快,但是不支持跨Spark Context共享。对于数据量比较大的cache,例如很多企业构建的物化视图或是Cube可能达到PB量级,显然在这种情况下Relational Cache更适合存储在类似HDFS,OSS这样的持久化分布式文件系统中。
- cache数据可用于优化后续任意可优化的用户查询。
EMR Spark通过扩展Spark实现Relational Cache,我们的工作主要包括如下几个部分:
- Spark SQL DDL扩展,扩展已有的CACHE语法,支持对任意Table/View的cache的增删改查。
- Metastore对cache meta信息的支持。通过metastore支持持久型的cache元数据管理。
- 扩展Spark Catalyst,支持Cache Based Optimizer,可以通过in-memory或是持久化的cache优化后续查询的执行计划。
- 基于CBO的cache选择,可能有多个cache满足执行计划重写,选择合适的cache用于最终的执行计划重写。
Relational Cache的使用
创建Relational Cache
CACHE [LAZY] TABLE table_name
[REFRESH ON (DEMAND | COMMIT)]
[(ENABLE | DISABLE) REWRITE]
[USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[ZORDER BY (col_name3, col_name4, ...)]
[CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]]
[AS select_statement]
创建cache的语法规范如上,我们可以通过该语法可以cache任意Spark表或视图,支持json,parquet,orc等数据格式,HDFS,OSS等数据源,以及partition, bucket和z-order等cache数据的组织方式。
REFRESH ON (DEMAND || COMMIT)
指定cache的更新方式,是在基表数据发生更新(COMMIT模式)时自动更新,还是用户通过更新DDL(DEMAND模式)手工触发更新。
(ENABLE | DISABLE) REWRITE
指定是否允许该cache被用于后续的执行计划优化。
此外,EMR Spark还提供和扩展了了更多的Relational Cache相关的DDL用于cache的增删改查。
UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name
EMR Spark还提供了session级别的参数控制是否开启基于Relational Cache的执行计划优化,用户可以通过spark.sql.cache.queryRewrite
参数开启或者关闭执行计划优化。
使用Relational Cache优化查询
下面通过一个简单的示例展示Relational Cache是如何优化Spark查询的。原始的查询SQL为:
SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name
对应的物理执行计划包括两次Join以及Aggregate操作,执行时间为16.9s, 如下所示:
== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
+- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
+- *(6) Project [o_totalprice#10, n_name#36]
+- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
:- *(6) Project [o_totalprice#10, c_nationkey#30L]
: +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
: :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(o_custkey#8L, 200)
: : +- *(1) Project [o_custkey#8L, o_totalprice#10]
: : +- *(1) Filter isnotnull(o_custkey#8L)
: : +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
: +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_custkey#27L, 200)
: +- *(3) Project [c_custkey#27L, c_nationkey#30L]
: +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
: +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(5) Project [n_nationkey#35L, n_name#36]
+- *(5) Filter isnotnull(n_nationkey#35L)
+- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
创建Relational cache有两种方式,可以先创建视图,然后通过Cache语法cache 视图的数据,如下所示:
CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;
或者也可以直接创建视图并cache数据。
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
Cache数据完成后,我们重新执行用户查询SQL,执行计划如下:
== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
+- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
+- *(1) Project [o_totalprice#20, n_name#35]
+- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
+- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...
可以看到基于cache优化后的执行计划直接从cache中读取数据,省去了两次join的计算时间,整体的执行时间也从16.9s下降到了1.9s。
总结
Relational Cache的强大功能赋予了Spark更多的可能,通过Relational Cache,用户可以提前将任意关系型数据(Table/View/Dataset)cache到任意Spark支持的DataSource中,并支持灵活的cache数据组织方式,基于此,Relational Cache可以在诸多应用场景中帮助用户加速Spark数据分析。在特定的应用场景中,比如针对星型模型多维度数据的聚合分析,可以实现PB级数据的亚秒级响应。