CacheManager原理图
入口RDD的iterator方法
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { // 如果持久化级别不是none,之前持久化过RDD,那么就不要直接去执行父RDD的算子, // 计算新的RDD的Partition优先采用cacheManager去获取持久化的数据 if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") // 从blockManager获取数据,获取到直接返回, blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values val inputMetrics = blockResult.inputMetrics val existingMetrics = context.taskMetrics .getInputMetricsForReadMethod(inputMetrics.readMethod) existingMetrics.incBytesRead(inputMetrics.bytesRead) val iter = blockResult.data.asInstanceOf[Iterator[T]] new InterruptibleIterator[T](context, iter) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } // 如果没有获取到数据,虽然之前持久化过,就只能再次获取数据,如果没有获取到 case None => // Acquire a lock for loading this partition // If another thread already holds the lock, wait for it to finish return its results val storedValues = acquireLockForPartition[T](key) if (storedValues.isDefined) { return new InterruptibleIterator[T](context, storedValues.get) } // Otherwise, we have to load the partition ourselves try { logInfo(s"Partition $key not found, computing it") // 如果rdd之前checkpoint过,那就尝试直接读取checkpoint,如果没有就只能采用父rdd重新再计算一次 val computedValues = rdd.computeOrReadCheckpoint(partition, context) // If the task is running locally, do not persist the result if (context.isRunningLocally) { return computedValues } // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] // 重新计算了或者读取了数据要重新存放如BlockManager中 val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) val metrics = context.taskMetrics val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) new InterruptibleIterator(context, cachedValues) } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } } }
private def putInBlockManager[T]( key: BlockId, values: Iterator[T], level: StorageLevel, updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { val putLevel = effectiveStorageLevel.getOrElse(level) if (!putLevel.useMemory) { /* * This RDD is not to be cached in memory, so we can just pass the computed values as an * iterator directly to the BlockManager rather than first fully unrolling it in memory. */ updatedBlocks ++= blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) blockManager.get(key) match { case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => logInfo(s"Failure to store $key") throw new BlockException(key, s"Block manager failed to return cached value for $key!") } } else { /* * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back. * * In addition, we must be careful to not unroll the entire partition in memory at once. * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this * single partition. Instead, we unroll the values cautiously, potentially aborting and * dropping the partition to disk if applicable. */ // 如果持久化级别是内存,将数据写入内存,如果unrollSafely判断可以写入内存,如果不行就写入磁盘文件 // blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { case Left(arr) => // We have successfully unrolled the entire partition, so cache it in memory updatedBlocks ++= blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) arr.iterator.asInstanceOf[Iterator[T]] case Right(it) => // There is not enough space to cache this partition in memory val returnValues = it.asInstanceOf[Iterator[T]] // 有些数据如果不能写入内存,判断是否有磁盘的持久化级别,尝试写入磁盘 if (putLevel.useDisk) { logWarning(s"Persisting partition $key to disk instead.") val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, useOffHeap = false, deserialized = false, putLevel.replication) putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) } else { returnValues } } } }