Spark SQL(5) CacheManage
在spark sql的analyzed plan 生成之后,会经过一步withCachedData的操作,其实就是根据对logicalplan的缓存,如果logicalPlan的查询结果相同则会替换相对应的节点。这步发生在QueryExecution.withCachedData:
lazy val withCachedData: LogicalPlan = { assertAnalyzed() assertSupported() sparkSession.sharedState.cacheManager.useCachedData(analyzed) }
/** Replaces segments of the given logical plan with cached versions where possible. */
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
// node, so that the returned caching plan won‘t replace the hint node and drop the hint info
// from the original plan.
case hint: ResolvedHint => hint
case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
}
newPlan transformAllExpressions {
case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
}
这里面主要是CacheManager.lookupCachedData方法,
/** Optionally returns cached data for the given [[LogicalPlan]]. */ def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { cachedData.asScala.find(cd => plan.sameResult(cd.plan)) }
@transient
private val cachedData = new java.util.LinkedList[CachedData]
/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
从上面可以看到CacheManager是通过一个链表保存了LogicalPlan和InMemoryRelation(叶子节点),从而在执行的时候直接替换缓存的结果。
此处有个问题,这个链表是什么时候放进去的呢?其实需要调用dataset的persist方法即可
/** * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because * recomputing the in-memory columnar representation of the underlying table is expensive. */ def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan, tableName, planToCache.stats) cachedData.add(CachedData(planToCache, inMemoryRelation)) } }
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
这里其实就是通过后序遍历的方式,查看缓存在cacheData中的逻辑计划,如果匹配就把整个节点替换。