CACHE TABLE的能力
使用此语法,可以由用户自定义要缓存的结果集,实际上就是一个临时表,不过数据存储在Spark集群内部,由Application所分配的executors管理。
一旦定义了一个缓存表
,就可以在SQL脚本中随处引用这个表名,提高数据检索速度,同时也会资源不必要的资源开销。
用户可以通过UNCACHE TABLE
语法,显示地将这个结果集从缓存中移除。
CACHE TABLE的功能示例
示例SQL
SELECT b.id, a.country, b.city, b.tag
FROM default.tmp_tbl a
JOIN default.tmp_tbl b
ON a.city IS NOT NULL AND a.id = b.id AND a.id > 0 AND a.country = 'China'
优化前的物理执行计划
+- Project [id#153, country#152, city#155, tag#154]
+- BroadcastHashJoin [id#149], [id#153], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=196]
: +- Project [id#149, country#152]
: +- Filter (isnotnull(id#149) AND (id#149 > 0))
: +- Scan odps du_all.tmp_tbl[id#149,city#151,country#152] Batched: true, DataFilters: [isnotnull(id#149), (id#149 > 0)], Format: Odps, PartitionFilters: [isnotnull(country#152), isnotnull(city#151), (country#152 = China)], PushedFilters: [IsNotNull(id), GreaterThan(id,0)], ReadSchema: struct<id:int,city:string,country:string>
+- Project [id#153, tag#154, city#155]
+- Filter ((id#153 > 0) AND isnotnull(id#153))
+- Scan odps du_all.tmp_tbl[id#153,tag#154,city#155,country#156] Batched: true, DataFilters: [(id#153 > 0), isnotnull(id#153)], Format: Odps, PartitionFilters: [], PushedFilters: [GreaterThan(id,0), IsNotNull(id)], ReadSchema: struct<id:int,tag:string,city:string,country:string
优化后的物理执行计划
优化后的SQL
CACHE TABLE cached_tbl AS (
SELECT * FROM default.tmp_tbl WHERE id > 0;
);
SELECT b.id, a.country, b.city, b.tag
FROM cached_tbl a
JOIN cached_tbl b
ON a.city IS NOT NULL AND a.id = b.id AND a.country = 'China'
物理执行计划
从如下的计划可以看到,BroadcastHashJoin的左、右子查询的数据都来自同一个
In-memory table
,因此只会读源表一次。
+- Project [id#153, country#152, city#155, tag#154]
+- BroadcastHashJoin [id#149], [id#153], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=196]
: +- Project [id#149, country#152]
: +- Filter (isnotnull(id#149) AND (id#149 > 0) AND isnotnull(country#152) AND isnotnull(city#151) AND (country#152 = China))
: +- Scan In-memory table
: +- InMemoryRelation [id#149,city#151,country#152], StorageLevel(disk, 1 replicas)
: +- ColumnarToRow
: +- Scan hive default.tmp_tbl[id#149,city#151,country#152] Batched: true, DataFilters: [isnotnull(id#149), (id#149 > 0)], Format: Hive, PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0)], ReadSchema: struct<id:int,city:string,country:string>
+- Project [id#153, tag#154, city#155]
+- Filter ((id#153 > 0) AND isnotnull(id#153))
+- Project [id#149 AS id#153, city#151 AS city#154, country#152 AS country#155]
+- Scan In-memory table
+- InMemoryRelation [id#149,city#151,country#152], StorageLevel(disk, 1 replicas)
+- ColumnarToRow
+- Scan hive default.tmp_tbl[id#149,city#151,country#152] Batched: true, DataFilters: [isnotnull(id#149), (id#149 > 0)], Format: Hive, PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,0)], ReadSchema: struct<id:int,city:string,country:string>
CACHE TABLE的执行流程
CACHE TABLE
语法定义了一个带有别名的,可以在Session层级共享的临时数据集,同时可以指定这些临时数据的存储模式(Memory or Disk or Mixed),简单说就是这些数据被缓存在了集群中,并被BlockManager
管理。
一旦定义了这样一个临时表
,就可以在任意的SQL中通过别名引用
,同时可以被引用任意次,而不用担心数据的重复读取,以及从来源
拉取数据的开销。
图1. 两条SQL读取同一个CACHE TABLE的结果集
*为什么不使用CTE?与CACHE TABLE有什么区别?
Spark中的CTE(Common Table Expression)语法
,仅仅用于替换相同的子查询,更像是一种Query Scope的视图(View),而不能像Hive那样可以通过配置属性hive.optimize.cte.materialize.threshold=1
实现物化子查询的能力,因此Spark只能通过CACHE TABLE
语句,显示地(不难想到,可以hacking成隐式)
缓存数据。
Spark不是可以共享Stage吗?为什么还要再在提供CACHE TABLE?
没错,Spark提供plan优化器,提供了ReuseExchangeAndSubquery
优化规则,旨在共享相同的stage(必须带有exchange算子),但很不幸的是,相同的子查询在不同的Query Scope中经过优化器优化后,往往是不相同的,例如不同的过滤条件、不同的投影剪裁等,因此也就无法共享
了。
在实际上的使用场景中,不论是CTE还是其它带有子查询的场景,我们都可以人工地合并底层stage/table scan的逻辑,然后通过CACHE TABLE
定义成一张临时表,达到共享数据的目的,自然且肯定地可以增强spark以自动化这个过程(我就做了这样的优化并在实际场景中应用了)
。
缓存RDD的代码流程分析(基于Columnar)
下图(引自中的截图)描述了RDD缓存的核心流程,不难看到Spark中RDD的缓存策略是很灵活的,不仅可以支持指定存储等级,还可以选择是否序列化存储(在序列化之前,还可以修改属性spark.sql.inMemoryColumnarStorage.compressed=true
,选择要不要对列进行压缩)。
从上图可以很清楚地看到,Spark中的RDD缓存模式是很灵活的,可以同时使用内存和磁盘,能够最大程度地保证存储性能(得益于Spark Tungsten统一内存管理模型
)。
生成RDD[ColumnarBatch]
Table Scan任务,如果要读取的字段是支持列式读写的,那么Spark默认生成一个RDD[ColumnarBatch]的实例,直接以列式格式从数据源加载数据。
生成RDD[InternalRow]
插入ColumarToRowExec/RowToColumnarExec算子
自底向上在optimizedPlan的合适位置插入行转列、列转行的物理计划结点。
由于InMemoryTableScanExec结点是不支持RDD[ColumnarBatch]作为其输入RDD的,假如child是支持列式的,那么就会在它们之间插入一个ColumarToRowExec实例,以保证InMemoryTableScanExec结点能够正常读取数据。
/**
* Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions
* to/from columnar formatted data.
*
* @param columnarRules custom columnar rules
* @param outputsColumnar whether or not the produced plan should output columnar format.
*/
case class ApplyColumnarRulesAndInsertTransitions(
columnarRules: Seq[ColumnarRule],
outputsColumnar: Boolean)
extends Rule[SparkPlan] {
/**
* Inserts an transition to columnar formatted data.
*/
private def insertRowToColumnar(plan: SparkPlan): SparkPlan = {
if (!plan.supportsColumnar) {
// The tree feels kind of backwards
// Columnar Processing will start here, so transition from row to columnar
RowToColumnarExec(insertTransitions(plan, outputsColumnar = false))
} else if (!plan.isInstanceOf[RowToColumnarTransition]) {
plan.withNewChildren(plan.children.map(insertRowToColumnar))
} else {
plan
}
}
/**
* Inserts RowToColumnarExecs and ColumnarToRowExecs where needed.
*/
private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
if (outputsColumnar) {
insertRowToColumnar(plan)
} else if (plan.supportsColumnar && !plan.supportsRowBased) {
// `outputsColumnar` is false but the plan only outputs columnar format, so add a
// to-row transition here.
ColumnarToRowExec(insertRowToColumnar(plan))
} else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false)))
} else {
plan
}
}
def apply(plan: SparkPlan): SparkPlan = {
var preInsertPlan: SparkPlan = plan
columnarRules.foreach(r => preInsertPlan = r.preColumnarTransitions(preInsertPlan))
var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar)
columnarRules.reverse.foreach(r => postInsertPlan = r.postColumnarTransitions(postInsertPlan))
postInsertPlan
}
}
执行ColumarToRowExec生成RDD[InternalRow]
生成RDD[CachedBatch]
InMemoryTableScanExec
从internal Relation获取数据,并根据数据源的schema信息,选择如何生成CachedBatch,作为缓存数据时的数据结构。
case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
@transient relation: InMemoryRelation)
extends LeafExecNode {
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override val nodeName: String = {
relation.cacheBuilder.tableName match {
case Some(_) =>
"Scan " + relation.cacheBuilder.cachedName
case _ =>
super.nodeName
}
}
override def vectorTypes: Option[Seq[String]] =
relation.cacheBuilder.serializer.vectorTypes(attributes, conf)
override def supportsRowBased: Boolean = true
/**
* If true, get data from ColumnVector in ColumnarBatch, which are generally faster.
* If false, get data from UnsafeRow build from CachedBatch
*/
override val supportsColumnar: Boolean = {
conf.cacheVectorizedReaderEnabled &&
!WholeStageCodegenExec.isTooManyFields(conf, relation.schema) &&
relation.cacheBuilder.serializer.supportsColumnarOutput(relation.schema)
}
private lazy val columnarInputRDD: RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val buffers = filteredCachedBatches()
relation.cacheBuilder.serializer.convertCachedBatchToColumnarBatch(
buffers,
relation.output,
attributes,
conf).map { cb =>
numOutputRows += cb.numRows()
cb
}
}
private lazy val inputRDD: RDD[InternalRow] = {
if (enableAccumulatorsForTest) {
readPartitions.setValue(0)
readBatches.setValue(0)
}
val numOutputRows = longMetric("numOutputRows")
// Using these variables here to avoid serialization of entire objects (if referenced
// directly) within the map Partitions closure.
val relOutput = relation.output
val serializer = relation.cacheBuilder.serializer
// update SQL metrics
val withMetrics =
filteredCachedBatches().mapPartitionsInternal { iter =>
if (enableAccumulatorsForTest && iter.hasNext) {
readPartitions.add(1)
}
iter.map { batch =>
if (enableAccumulatorsForTest) {
readBatches.add(1)
}
numOutputRows += batch.numRows
batch
}
}
serializer.convertCachedBatchToInternalRow(withMetrics, relOutput, attributes, conf)
}
override def output: Seq[Attribute] = attributes
private def updateAttribute(expr: Expression): Expression = {
// attributes can be pruned so using relation's output.
// E.g., relation.output is [id, item] but this scan's output can be [item] only.
val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
expr.transform {
case attr: Attribute => attrMap.getOrElse(attr, attr)
}
}
// The cached version does not change the outputPartitioning of the original SparkPlan.
// But the cached version could alias output, so we need to replace output.
override def outputPartitioning: Partitioning = {
relation.cachedPlan.outputPartitioning match {
case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
case other => other
}
}
// The cached version does not change the outputOrdering of the original SparkPlan.
// But the cached version could alias output, so we need to replace output.
override def outputOrdering: Seq[SortOrder] =
relation.cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
lazy val enableAccumulatorsForTest: Boolean = conf.inMemoryTableScanStatisticsEnabled
// Accumulators used for testing purposes
lazy val readPartitions = sparkContext.longAccumulator
lazy val readBatches = sparkContext.longAccumulator
private val inMemoryPartitionPruningEnabled = conf.inMemoryPartitionPruning
private def filteredCachedBatches(): RDD[CachedBatch] = {
val buffers = relation.cacheBuilder.cachedColumnBuffers
if (inMemoryPartitionPruningEnabled) {
val filterFunc = relation.cacheBuilder.serializer.buildFilter(predicates, relation.output)
buffers.mapPartitionsWithIndexInternal(filterFunc)
} else {
buffers
}
}
protected override def doExecute(): RDD[InternalRow] = {
inputRDD
}
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
columnarInputRDD
}
}
通过InMemoryRelation加载并转换RDD
当执行InMemoryTableScanExec算子时,会在compute RDD的过程中,调用InMemoryRelation::CachedRDDBuilder::cachedColumnBuffers,同时会显示地通过
RDD::persist()
方法指定持久等级,以便在触发此RDD的compute时能够正确缓存数据。
private[sql]
case class CachedRDDBuilder(
serializer: CachedBatchSerializer,
storageLevel: StorageLevel,
@transient cachedPlan: SparkPlan,
tableName: Option[String]) {
@transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null
@transient @volatile private var _cachedColumnBuffersAreLoaded: Boolean = false
val sizeInBytesStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator
val rowCountStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator
val cachedName = tableName.map(n => s"In-memory table $n")
.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))
def cachedColumnBuffers: RDD[CachedBatch] = {
if (_cachedColumnBuffers == null) {
synchronized {
if (_cachedColumnBuffers == null) {
_cachedColumnBuffers = buildBuffers()
}
}
}
_cachedColumnBuffers
}
def clearCache(blocking: Boolean = false): Unit = {
if (_cachedColumnBuffers != null) {
synchronized {
if (_cachedColumnBuffers != null) {
_cachedColumnBuffers.unpersist(blocking)
_cachedColumnBuffers = null
}
}
}
}
def isCachedColumnBuffersLoaded: Boolean = {
if (_cachedColumnBuffers != null) {
synchronized {
return _cachedColumnBuffers != null && isCachedRDDLoaded
}
}
false
}
private def isCachedRDDLoaded: Boolean = {
_cachedColumnBuffersAreLoaded || {
val bmMaster = SparkEnv.get.blockManager.master
val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false)
.exists { case(_, blockStatus) => blockStatus.isCached }
}
if (rddLoaded) {
_cachedColumnBuffersAreLoaded = rddLoaded
}
rddLoaded
}
}
private def buildBuffers(): RDD[CachedBatch] = {
val cb = if (cachedPlan.supportsColumnar &&
serializer.supportsColumnarInput(cachedPlan.output)) {
// serializer默认是一个DefaultCachedBatchSerializer实例,它是不支持将ColumnarBatch转换成CachedBatch的,因此代码是不会执行到这里的
serializer.convertColumnarBatchToCachedBatch(
cachedPlan.executeColumnar(),
cachedPlan.output,
storageLevel,
cachedPlan.conf)
} else {
// 在前面有提到,cachedPlan通常会被插入一个ColumarToRowExec算子,以使RDD[ColumnarBatch]转换成RDD[InternalRow],因此代码会最终经过这里
serializer.convertInternalRowToCachedBatch(
cachedPlan.execute(),
cachedPlan.output,
storageLevel,
cachedPlan.conf)
}
// 持久化RDD[CachedBatch]
val cached = cb.map { batch =>
sizeInBytesStats.add(batch.sizeInBytes)
rowCountStats.add(batch.numRows)
batch
}.persist(storageLevel) // 在这里显示地标识这个RDD的持久化等级,注意必须要对某一个设置合适的值,否则默认的持久化等级是NONE即不持久化,也就不会缓存了
cached.setName(cachedName)
cached
}
}
生成RDD[CachedBatch]
实则调用DefaultCachedBatchSerializer::convertInternalRowToCachedBatch方法,将RDD[InternalRow]转换成RDD[CachedBatch]。
/**
* The default implementation of CachedBatchSerializer.
*/
class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = false
override def convertColumnarBatchToCachedBatch(
input: RDD[ColumnarBatch],
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] =
throw new IllegalStateException("Columnar input is not supported")
override def convertInternalRowToCachedBatch(
input: RDD[InternalRow],
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] = {
// 指定待缓存的数据,以多少行为一个Batch,与scan split生成的batch大小是不同的,对应如下属性:
// spark.sql.inMemoryColumnarStorage.batchSize=10000
val batchSize = conf.columnBatchSize
// 待缓存的数据,可以支持压缩
val useCompression = conf.useCompression
// 将输入的RDD[InternalRow]转换成RDD[CachedBatch]
convertForCacheInternal(input, schema, batchSize, useCompression)
}
def convertForCacheInternal(
input: RDD[InternalRow],
output: Seq[Attribute],
batchSize: Int,
useCompression: Boolean): RDD[CachedBatch] = {
input.mapPartitionsInternal { rowIterator =>
new Iterator[DefaultCachedBatch] {
def next(): DefaultCachedBatch = {
// 按输出的atttributes属性,创建ColumnBuilder实例,以构建列式数据
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray
var rowCount = 0
var totalSize = 0L
// 遍历上游RDD的产出数据,并控制生成的CachedBatch大小,默认配置下有如下的限制:
// batchSize = 1000
// ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024
while (rowIterator.hasNext && rowCount < batchSize
&& totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()
// Added for SPARK-6082. This assertion can be useful for scenarios when something
// like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")
var i = 0
totalSize = 0 // 累加所有列的最新字节总和
// 遍历所有的输入RDD的每一个列,将期追加到对应的ColumnBuilder中
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
// 向第i列新追加了一行,
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}
// 统计当前Batch的静态统计信息,例如batch/列压缩前后的字节大小、总行数等
// 而stats会包含每一个列更加详情的统计指标:
// 例如对于Int列有(其中最大最小值即Batch内当前列的所有整数的最值):
// Array[Any](lower, upper, nullCount, count, sizeInBytes)
// 对于String列有(其中最大最小值即Batch内当前列的所有UTF8String值按字节大小排序后的最值):
// Array[Any](lower, upper, nullCount, count, sizeInBytes)
// 因此我们可以基于stats信息,在经过InMemoryTableScanExec算子时,执行更多的过滤
val stats = InternalRow.fromSeq(
columnBuilders.flatMap(_.columnStats.collectedStatistics).toSeq)
// 生成DefaultCachedBatch实例
DefaultCachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}
def hasNext: Boolean = rowIterator.hasNext
}
}
}
}
缓存RDD[CachedBatch]
从前文知道,InMemoryRelation::buildBuffers()方法返回的RDD被显示指定了持久化等级,默认是
MEMORY_AND_DISK
,因此当下游RDD触发compute时,RDD[CachedBatch]::iterator(split: Partition, context: TaskContext)的方法会被递归调用(Table Scan组成CachedBatch),而后缓存结果集(就是CACHE TABLE生成的临时表数据)。
RDD[CachedBatch]持久化
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext