深入理解Spark:核心思想与源码分析. 3.11 ContextCleaner的创建与启动

3.11 ContextCleaner的创建与启动

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner: Option[ContextCleaner] = {

    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {

        Some(new ContextCleaner(this))

    } else {

        None

    }

}

cleaner.foreach(_.start())

ContextCleaner的组成如下:

referenceQueue:缓存*的AnyRef引用;

referenceBuffer:缓存AnyRef的虚引用;

listeners:缓存清理工作的监听器数组;

cleaningThread:用于具体清理工作的线程。

ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。

代码清单3-48 keep Cleaning的实现

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

    while (!stopped) {

        try {

            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

                .map(_.asInstanceOf[CleanupTaskWeakReference])

            // Synchronize here to avoid being interrupted on stop()

            synchronized {

                reference.map(_.task).foreach { task =>

                logDebug("Got cleaning task " + task)

                referenceBuffer -= reference.get

                task match {

                    case CleanRDD(rddId) =>

                        doCleanupRDD(rddId, blocking = blockOnCleanupTasks)

                    case CleanShuffle(shuffleId) =>

                        doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

                    case CleanBroadcast(broadcastId) =>

                        doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)

                    }

                }

            }

        } catch {

            case ie: InterruptedException if stopped => // ignore

            case e: Exception => logError("Error in cleaning thread", e)

        }

    }

}

上一篇:Spark源码分析 – SparkContext


下一篇:大牛总结的 Git 使用技巧,写得太好了!