Spark源码阅读笔记一——part of core

内部accumulator通过心跳报告给driver
task运行时可以累加accumulator,但是不能读取value,value只能在driver获取
spark内部用一个weakhashmap保存accumulator,便于gc的清理

CacheManager
spark的类用于负责传递RDD的分区内容给BlockManager,并保证一个节点不会载入一个rdd的两份拷贝,这个通过一个hashset实现,已载入的rdd会将id保存到set中
获取和计算rdd时,先判断是否已经计算,如果没有再从blockmanager获取block然后计算结果。
除非是本地模式,不然rdd的计算结果都会缓存
如果rdd不需要在内存中缓存,则直接将计算结果通过iterator直接传给blockmanager
在rdd需要缓存时,我们必须小心不能在内存中一次性展开全部的partition,否则如果jvm没有足够的空间给这个单个的partition可能会引发OOM异常。
取而代之的是,我们展开这些value,小心的、可能的放弃并丢掉这个partition到磁盘如果合适。
如果空间足够就全部缓存到内存中,否则如果使用磁盘就放到磁盘,不然直接就返回value

Dependency
NarrowDependency:一个子partition依赖于多个父partition
ShuffleDependency:shuffle stage的输出依赖,在shuffle中,rdd是短暂的因为我们在executor端不需要它

ExecutorAllocationClient
与cluster manager请求或杀掉executor的客户端
根据我们的调度需要更新集群,依赖于三个信息
1 executor的数量,我们需要的全部的executor数,cluster manager不能杀掉任何运行中的executor来达到这个数量,这是我们想要分配的executor数量
2 所有要运行的stage中有本地偏好的task数量,包括运行等待和完成的task
3 task到运行host的map

ExecutorAllocationManager(EAM)
一个代理,根据工作负载动态的分配和移除executor
EAM维护一个移动的目标executor数量,定期的同步到cluster manager。target的数量从配置的一个初始值开始,并根据等待和运行task数变化
在当前的target数量多于需要控制的当前的负载时,会减少target数量。target总是会一次性减到可以运行所以当前运行和等待task的数量
当需要响应积压的等待需要调度的task时,会增加target的数量。如果一个队列在N秒内没有排空,则新的executor被加入。如果这个队列仍然在另外的M秒内存在,则更多的executor会被加入。增加的数量在每轮以上一轮的指数级增加,直到达到上限。上限是基于一个配置的属性和当前运行和等待任务的数量。
指数增长有双重理由。
1 executor应该在开始缓慢的增加,以防万一额外需要的executor数量很小。否则我们增加了多于我们需要的executor数量则我们需要在后面移除他们。
2 executor的数量需要快速增加,以防万一executor的数量最大值非常高,否则在繁重的工作负载下性能提升需要很长时间。
executor移除的策略很简单,如果一个executor已经空闲了K秒,意味着它没有被调度用于执行任何task,因而移除它。
这里没有重试的逻辑,因为我们假定cluster manager最终会异步的执行所有它收到的请求。
相关的spark属性如下

成员变量initializing,是否需要一直等待初始化的executor集合被分配,当这个变量为true的时候,我们不会取消未执行的executor请求。这个在下面两种情况会被设置成false
1 一个stage被提交
2 一个executor的空闲时间超时
用于增加减少executor的调度任务是一个定时任务,每100毫秒执行一次
调度方法上,首先基于添加时间和我们当前的需要调整我们请求的executor,然后如果一个已存在的executor已经过期了,则杀掉。

updateAndSyncNumExecutorsTarget:更新target数量并同步结果到cluster manager。检查我们已存在的分配和之前的请求超过我们现在的需要。如果满足,truncate target数量并让cluster manager知道以便于它可以取消不需要的等待的请求。如果不满足,并且添加的时间超时,看看我们是否能请求新的executor,并刷新添加时间。

当一个executor(程它为executor X)因为到达了下限而没有被删除,则它不会再被标记位空闲。当有新的executor加入,我们不再在最低下限,则我们必须再次标记executor X为空闲,以使我们不会忘记它是一个被移除的候选。
当scheduler的队列是空的时候,就会将addtime设为未设置
所有cache的block会被报告给driver,但不包括广播的block
当executor执行任务了(busy),就会清除它的idle time
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
stageid到tuple的map,tuple是节点和将会在这个节点上运行的task的数量
taskstart和blockmanageradded这些事件是在不同的线程执行的,因而顺序不一定,taskstart事件中将对应的executor置为busy
taskend,如果executor不再运行任何调度的任务,则标记为idle
如果task失败,则会将scheduler置为积压任务的状态,将这个task从这个stage对应的task列表中移除

上一篇:阿里云HBase推出全新X-Pack服务 定义HBase云服务新标准


下一篇:阿里云获CCIA开源数据库专业委员会副会长单位 推动产业创新变革