《深入理解Spark:核心思想与源码分析》——3.3节创建metadataCleaner

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.3节创建metadataCleaner,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.3 创建metadataCleaner
SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
private[spark] val metadataCleaner =
    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
我们仔细看看MetadataCleaner的实现,见代码清单3-14。
代码清单3-14 MetadataCleaner的实现
private[spark] class MetadataCleaner(
        cleanerType: MetadataCleanerType.MetadataCleanerType,
        cleanupFunc: (Long) => Unit,
        conf: SparkConf)
    extends Logging
{
    val name = cleanerType.toString

    private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
    private val periodSeconds = math.max(10, delaySeconds / 10)
    private val timer = new Timer(name + " cleanup timer", true)

    private val task = new TimerTask {
        override def run() {
        try {
            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
            logInfo("Ran metadata cleaner for " + name)
        } catch {
            case e: Exception => logError("Error running cleanup task for " + name, e)
        }
      }
    }

    if (delaySeconds > 0) {
        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
    }

    def cancel() {
        timer.cancel()
    }
}
从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) => Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。
private[spark] def cleanup(cleanupTime: Long) {
    persistentRdds.clearOldValues(cleanupTime)
}
上一篇:在哪里注册域名比较好?


下一篇:阿里云服务器双11配置仅需85元/年