Spark源码分析 – SparkContext 中的例子, 只分析到sc.runJob
那么最终是怎么执行的? 通过DAGScheduler切分成Stage, 封装成taskset, 提交给TaskScheduler, 然后等待调度, 最终到Executor上执行
val sc = new SparkContext(……) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()
这是一个比较简单的没有shuffle的例子, 看看在Executor上是如何被执行的
首先这个job只有一个stage,
所以只会产生resultTask
最关键的执行语句,
func(context, rdd.iterator(split, context))
对于这个例子, func就是最终产生结果的count(), 而rdd就是count前最后一个rdd, 即filter产生的rdd
可以看到Spark中rdd的执行, 不是从前往后, 而是从后往前推的, 为什么? 因为需要考虑cache和checkpoint
所以对于stage只会保留最后一个rdd, 其他的rdd通过dep去反推, 这里调用rdd.iterator来读取最后一个rdd
我可以说iterator是spark中最为核心的一个function吗:-)
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
如果结果被cache在memory或disk中, 则调用cacheManager.getOrCompute来读取,
否则直接从checkpoint读或compute
通过CacheManager来完成从cache中读取数据,
或重新compute数据并且完成cache的过程
private[spark] class CacheManager(blockManager: BlockManager) extends Logging { private val loading = new HashSet[String] /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) blockManager.get(key) match { // 从blockManager中获取cached值 case Some(cachedValues) => // 从blockManager读到数据, 说明之前cache过, 直接返回即可 // Partition is in cache, so just return its values return cachedValues.asInstanceOf[Iterator[T]] case None => // 没有读到数据说明没有cache过,需要重新load(compute或读cp) // Mark the split as loading (unless someone else marks it first) loading.synchronized { // 防止多次load相同的rdd, 加锁 if (loading.contains(key)) { while (loading.contains(key)) { try {loading.wait()} catch {case _ : Throwable =>} // 如果已经在loading, 只需要wait } // See whether someone else has successfully loaded it. The main way this would fail // is for the RDD-level cache eviction policy if someone else has loaded the same RDD // partition but we didn‘t want to make space for it. However, that case is unlikely // because it‘s unlikely that two threads would work on the same RDD partition. One // downside of the current code is that threads wait serially if this does happen. blockManager.get(key) match { case Some(values) => return values.asInstanceOf[Iterator[T]] case None => logInfo("Whoever was loading " + key + " failed; we‘ll try it ourselves") loading.add(key) } } else { loading.add(key) // 记录当前key, 开始loading } } try { // If we got here, we have to load the split logInfo("Computing partition " + split) // loading的过程,就是读cp或重新compute val computedValues = rdd.computeOrReadCheckpoint(split, context) // Persist the result, so long as the task is not running locally if (context.runningLocally) { return computedValues } val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, true) // 对新产生的数据经行cache, 调用blockManager.put return elements.iterator.asInstanceOf[Iterator[T]] } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } } } }
Task执行的结果, 如何传到DAGScheduler
task执行的结果value, 参考Spark 源码分析 -- Task
对于ResultTask是计算的值,比如count值,
对于ShuffleTask为MapStatus(blockManager.blockManagerId, compressedSizes),
其中compressedSizes所有shuffle buckets写到文件中的data size
//TaskRunner val value = task.run(taskId.toInt) val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) //context,StandaloneExecutorBackend //StandaloneExecutorBackend.statusUpdate driver ! StatusUpdate(executorId, taskId, state, data) //DriverActor.StatusUpdate scheduler.statusUpdate(taskId, state, data.value) //ClusterScheduler.statusUpdate var taskSetToUpdate: Option[TaskSetManager] = None taskSetToUpdate.get.statusUpdate(tid, state, serializedData) //ClusterTaskSetManager.statusUpdate case TaskState.FINISHED => taskFinished(tid, state, serializedData) //ClusterTaskSetManager.taskFinished val result = ser.deserialize[TaskResult[_]](serializedData) result.metrics.resultSize = serializedData.limit() sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) //tasks = taskSet.tasks //info为TaskInfo class TaskInfo( val taskId: Long, val index: Int, val launchTime: Long, val executorId: String, val host: String, val taskLocality: TaskLocality.TaskLocality) //DAGScheduler.taskEnded override def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any], taskInfo: TaskInfo, taskMetrics: TaskMetrics) { eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)) } //DAGScheduler.processEvent handleTaskCompletion(completion) //DAGScheduler.handleTaskCompletion ......