关于Yarn的一些常见问题
Yarn三种调度策略
理想情况下,应用对Yarn资源的请求应该立刻得到满足,但现实情况资源往往是有限的,特别是在一个很繁忙的集群,一个应用资源的请求经常需要等待一段时间才能的到相应的资源。在Yarn中,负责给应用分配资源的是Scheduler。在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,FairScheduler。
-
FIFO Scheduler
FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,依次类推。
FIFO Scheduler是最简单也是最容易理解的调度器,不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。
图中,在FIFO 调度器中,小任务会被大任务阻塞。 -
Capacity Scheduler
对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。
支持多个队列,每个队列可配置一定量的资源,每个采用FIFO的方式调度。为了防止同一个用户的job任务独占队列中的资源,调度器会对同一用户提交的job任务所占资源进行限制。分配新的job任务时,首先计算每个队列中正在运行task个数与其队列应该分配的资源量做比值,然后选择比值最小的队列。其次,按照job任务的优先级和时间顺序,同时要考虑到用户的资源量和内存的限制,对队列中的job任务进行排序执行。多个队列同时按照任务队列内的先后顺序一次执行。
Capacity调度器根据来自NodeManager的信息,将Container放到最适合的节点上。当工作负载可以预见的情况下,Capacity调度器效果最好,因为这有助于分配最小容量。要使调度器有效工作,需要为每个队列分配小于最高预期负载的最小容量。在每个队列内部,使用层次化的FIFO来调度多个应用程序,类似于在独立的FIFO调度其中使用的方式。 -
Fair Scheduler
在Fair调度器中不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。支持多个队列,每个队列可以配置一定的资源,每个队列中的job任务公平共享其所在队列的所有资源。队列中的job任务都是按照优先级分配资源,优先级越高分配的资源越多,但是为了确保公平每个job任务都会分配到资源。优先级是根据每个job任务的理想获取资源量减去实际获取资源量的差值决定的,差值越大优先级越高。
如图所示,当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
需要注意的是,在Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。
Fair调度器支持 抢占,可以从ApplicationMaster要回Container。即当单个作业运行时,该作业获得了所有集群资源,当有另一个作业提交后,调度器会要回Container并分配适当的资源给这个作业(要回的在将来被分配的资源的多少取决于作业的大小,公平起见)。在这个过程中后面提交的任务获得资源会有一定延迟,因为需要等待前面任务的Container被ApplicationMaster要回。Fair调度器与Capacity调度器不同的一点就是,Fair调度器不需要事先占用一定的资源,它根据提交的作业动态调整系统资源。
Yarn抢占
在FairScheduler初始化会产生一个叫做updateThread的deamon线程:
private void initScheduler(Configuration conf) throws IOException {
synchronized (this) {
//省略
//创建更新线程,负责监控队列的状态并伺机进行抢占
updateThread = new UpdateThread();
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
//省略
}
private class UpdateThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(updateInterval);
long start = getClock().getTime();
update();
preemptTasksIfNecessary();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
LOG.warn("Update thread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
}
}
}
这个线程负责不断计算集群需要的资源并进行抢占,计算所需资源并抢占,发生在UpdateThread.preemptTasksIfNecessary()方法中:
/**
* 检查所有缺乏资源的Scheduler, 无论它缺乏资源是因为处于minShare的时间超过了minSharePreemptionTimeout
* 还是因为它处于fairShare的时间已经超过了fairSharePreemptionTimeout。在统计了所有Scheduler
* 缺乏的资源并求和以后,就开始尝试进行资源抢占。
*/
protected synchronized void preemptTasksIfNecessary() {
if (!shouldAttemptPreemption()) { //检查集群是否允许抢占发生
return;
}
long curTime = getClock().getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;//还没有到抢占时机,等下一次机会吧
}
lastPreemptCheckTime = curTime;
//初始化抢占参数为none,即什么也不抢占
Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
//计算所有叶子队列需要抢占的资源,累加到资源变量resToPreempt中
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) { //如果需要抢占的资源大于Resources.none(),即大于0
preemptResources(resToPreempt);//已经计算得到需要抢占多少资源,那么,下面就开始抢占了
}
}
shouldAttemptPreemption()用来从整个集群的层次判断是否应该尝试进行资源抢占,如果整个集群的层次不满足抢占条件,当然就不可以进行抢占:
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {//首先检查配置文件是否打开抢占
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}
return false;
}
shouldAttemptPreemption的判断标准主要有两个
- 是否已经开启了抢占:即yarn.scheduler.fair.preemption是否配置为true
- 整体集群资源利用率是否已经超过了yarn.scheduler.fair.preemption.cluster-utilization-threshold的配置值
如果以上条件均满足,则可以进行抢占相关的工作,包括计算需要抢占的资源以及进行抢占。
FairSchduler.resToPreempt()方法用来计算当前的Schedulable需要抢占的资源的大小,属于FairScheduler的核心方法,源码如下:
/**
* 计算这个队列允许抢占其它队列的资源大小。如果这个队列使用的资源低于其最小资源的时间超过了抢占超时时间,那么,
* 应该抢占的资源量就在它当前的fair share和它的min share之间的差额。如果队列资源已经低于它的fair share
* 的时间超过了fairSharePreemptionTimeout,那么他应该进行抢占的资源就是满足其fair share的资源总量。
* 如果两者都发生了,则抢占两个的较多者。
*/
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout();//minSharePreemptionTimeout
long fairShareTimeout = sched.getFairSharePreemptionTimeout();//fairSharePreemptionTimeout
Resource resDueToMinShare = Resources.none();//因为资源低于minShare而需要抢占的资源总量
Resource resDueToFairShare = Resources.none();//因为资源低于fairShare 而需要抢占的资源总量
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {//时间超过minSharePreemptionTimeout,则可以判断资源是否低于minShare
//选取sched.getMinShare()和sched.getDemand()中的较小值,demand代表队列资源需求量,即处于等待或者运行状态下的应用程序尚需的资源量
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
//选取Resources.none(即0)和 Resources.subtract(target, sched.getResourceUsage())中的较大值,即
//如果最小资源需求量大于资源使用量,则取其差额,否则,取0,代表minShare已经满足条件,无需进行抢占
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {// //时间超过fairSharePreemptionTimeout,则可以判断资源是否低于fairShare
//选取sched.getFairShare()和sched.getDemand()中的较小值,demand代表队列资源需求量,即处于等待或者运行状态下的应用程序尚需的资源量
//如果需要2G资源,当前的fairshare是2.5G,则需要2.5G
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
//选取Resources.none(即0)和 Resources.subtract(target, sched.getResourceUsage())中的较大值,即
//如果fair share需求量大于资源使用量,则取其差额,否则,取0,代表minShare已经满足条件,无需进行抢占
//再拿2.5G和当前系统已经使用的资源做比较,如果2.5G-usedResource<0, 则使用Resources.none(),即不需要抢占
//否则,抢占资源量为2.5G-usedResource<0
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message);
}
return resToPreempt;
}
根据Yarn的设计,由于资源抢占本身是一种资源的强行剥夺,会带来一定的系统开销。因此,Yarn会在实际抢占发生前,耐心等待一段时间,以尽量直接使用其它应用释放的资源来使用,而尽量避免使用抢占的方式。因此,在FairScheduler.xml中,需要配置这两个超时时间:
- minSharePreemptionTimeout 表示如果超过该指定时间,Scheduler还没有获得minShare的资源,则进行抢占
- fairSharePreemptionTimeout 表示如果超过该指定时间,Scheduler还没有获得fairShare的资源,则进行抢占
在计算完整体需要进行抢占的资源,就可以进行资源抢占了。这里结合Yarn关于FairScheduler的官方文档,必须清楚FairScheduler内部定义了不同的Policy决定进行资源抢占的方式,包括fair policy, drf policy以及 fifo policy。默认,是fair policy。
/**
* 基于已经计算好的需要抢占的资源(toPreempt()方法)进行资源抢占。每一轮抢占,我们从root 队列开始,
* 一级一级往下进行,直到我们选择了一个候选的application.当然,抢占分优先级进行。
* 依据每一个队列的policy,抢占方式有所不同。对于fair policy或者drf policy, 会选择超过
* fair share(这里的fair scheduler都是指Instantaneous Fair Share)
* 最多的ChildSchedulable进行抢占,但是,如果是fifo policy,则选择最后执行的application进行
* 抢占。当然,同一个application往往含有多个container,因此同一个application内部container
* 的抢占也分优先级。
*/
protected void preemptResources(Resource toPreempt) {
long start = getClock().getTime();
if (Resources.equals(toPreempt, Resources.none())) {
return;
}
//warnedContainers,被警告的container,即在前面某轮抢*被认为满足被强占条件的container 同样,yarn发现一个container满足被抢占规则,绝对不是立刻抢占,而是等待一个超时时间,试图让app自动释放这个container,如果到了超时时间还是没有,那么就可以直接kill了
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
//toPreempt代表依旧需要进行抢占的资源
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());//抢占到了一个container,则从toPreempt中去掉这个资源
} else {
warnedIter.remove();
}
}
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.resetPreemptedResources();
}
//toPreempt代表了目前仍需要抢占的资源,通过不断循环,一轮一轮抢占,toPreempt逐渐减小
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) { //只要还没有达到抢占要求
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
//找到了一个待抢占的container,同样,警告或者杀死这个container
warnOrKillContainer(container);
warnedContainers.add(container);
//重新计算剩余需要抢占的资源
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.clearPreemptedResources();
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addPreemptCallDuration(duration);
}
每一轮抢占,都会通过方法warnOrKillContainer来检查并处理所有的warnedContainers。
protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());
Long time = app.getContainerPreemptionTime(container);
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
//如果这个container在以前已经被标记为需要被抢占,并且时间已经超过了maxWaitTimeBeforeKill,那么这个container可以直接杀死了
if (time + waitTimeBeforeKill < getClock().getTime()) {
ContainerStatus status =
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL); //执行清理工作
LOG.info("Killing container" + container +
" (after waiting for premption for " +
(getClock().getTime() - time) + "ms)");
}
} else {
//把这个container标记为可能被抢占,也就是所谓的container警告,在下一轮或者几轮,都会拿出这个container判断是否超过了maxWaitTimeBeforeKill,如果超过了,则可以直接杀死了。
// track the request in the FSAppAttempt itself
app.addPreemption(container, getClock().getTime());
}
}
从方法名称可以看到,结果有两种:
- 杀死:如果这个container之前已经被标记为待抢占,并且距离标记时间已经超过了waitTimeBeforeKill却依然没有被自己的ApplicationMaster主动释放的container(太不自觉),如果是,那么既然在waitTimeBeforeKill之前已经向其主人(ApplicationMaster)发出警告,那么现在FairScheduler失去了耐心,直接杀死这个Container。
- 死期未到:如果这个Container之前已经被标记为抢占,但是距离标记时间还不到waitTimeBeforeKill,那么此次侥幸逃脱,下次再进行判断
- 标记和警告:如果这个container还从来没有被标记为待抢占,那么这次就进行标记,记录标记时间,下次updateThread到来,这个container会历经被杀死或者暂时死期未到。
completedContainer(container, status, RMContainerEventType.KILL);是一个典型的状态机过程,当前发生的事件是RMContainerEventType.KILL,即发生kill事件,然后ResourceManager端的container实现RMContainerImpl会根据自己的当前状态以及发生的kill事件,得出目标状态。
如果warnedContainer被抢占来的资源依然小于toPreempt,那就只好从队列里面选择某些container来抢占,抢占规则,就是队列具体定义的Policy。这段逻辑在preemptResources()方法的这段代码里:
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.resetPreemptedResources();
}
//toPreempt代表了目前仍需要抢占的资源,通过不断循环,一轮一轮抢占,toPreempt逐渐减小
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) { //只要还没有达到抢占要求
//通过具体队列的Policy要求,选择一个container用来被抢占
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
warnOrKillContainer(container);
//将这个container加入到警告列表,以后每一轮都会检查它是否被释放或者抢占,如果超过了一定时间还是没有被抢占或者主动释放,就可以直接kill并抢占了
warnedContainers.add(container);
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.clearPreemptedResources();
}
}
从RMContainer container =getQueueManager().getRootQueue().preemptContainer(); 出发,看看具体的Policy是如何决定选择哪个container进行抢占(执行死刑)的。选择FairScheduler默认的Policy FairSharePolicy来进行分析。分析的过程是,实际上是沿着队列树按照深度优先,逐渐往下遍历直至找到一个container用来抢占。
FSParentQueue.preemptContainer()
/**
* 从root queue开始,找出一个可以被抢占的container进行抢占。
* 决策和遍历过程实际上是一个递归调用的过程,从root queue开始,不断
* 由下级队列决定抢占自己下一级的哪个queue或者application或者container
* 最终,是由LeafQueue选择一个Application,然后Application选择一个
* Container
*/
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
Comparator<Schedulable> comparator = policy.getComparator();
//从自己所有的子队列中选择一个最应该被抢占的队列
for (FSQueue queue : childQueues) {
if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
}
}
// Let the selected queue choose which of its container to preempt
//选择出来了一个待抢占的队列以后,让这个队列自行决定抢占哪个container,采用**递归**调用的方式
if (candidateQueue != null) {
toBePreempted = candidateQueue.preemptContainer();
}
return toBePreempted;
}
从FSParentQueue.preemptContainer()的递归方式来看,寻找被抢占的container的过程,是从队列树的root queue开始,采用深度优先的方式进行。FairSharePolicy使用的资源比较器是DefaultResourceCalculator,从DefaultResourceCalculator中不难看出,进行资源大小的比较时,只考虑了memory,没有考虑vCore。因此,就FSLeafQueue.preemptContainer()而言,LeafQueue的意思是下面没有子队列。
FSLeafQueue.preemptContainer()
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
FSAppAttempt candidateSched = null;
readLock.lock();
try {
//从该叶子队列中的所有application中,选择一个更应该被强占的application
//如果使用默认Policy FairSharePolicy,那么选择标准就是该Application当前资源
//的欠缺或者充裕程度,资源越充裕,越可能被选中
for (FSAppAttempt sched : runnableApps) {
if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched;
}
}
} finally {
readLock.unlock();
}
// Preempt from the selected app
if (candidateSched != null) {
//由于是叶子队列,因此candidateSched肯定是一个APP,即FSAppAttempt对象
toBePreempted = candidateSched.preemptContainer();
}
return toBePreempted;
}
FSLeafQueue与FSParentQueue的抢占逻辑是几乎相同的,都是通过递归遍历进行深度优先遍历,唯一的区别,就是FSParentQueue的child是FSParentQueue或者FSLeafQueue,而FSLeafQueue的child是FSAppAttemtp。
FSAppAttempt.preemptContainer()
/**
* 根据优先级,从application的所有container中选择一个container用来被抢占
*/
@Override
public RMContainer preemptContainer() {
//省略
RMContainer toBePreempted = null;
//获取自己所有的running container
for (RMContainer container : getLiveContainers()) {
//使用比较器RMContainerComparator选择出一个最应该被抢占的container
if (!getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
}
}
FSAppAttempt用来决定自己的哪个container拿出来被抢占,采用的是比较器RMContainerComparator,这个比较器代码简单,贴出来:
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
}
可见,规则就是比较优先级,选择一个优先级较低的container,如果优先级相同,则比较containerId并选择一个id比较小的container。
这里可以看到Yarn队列设计的精妙之处。无论是parentQueue , 还是leaf queue ,或者是Application,虽然处在一个tree的不同level,但是他们的性质是一样的,都被抽象为Schedulable,因此都需要实现preemptContainer()方法。在决定哪个container被抢占的时候,就可以递归进行,ParentQueue交给下面的leaf Queue 或者 下面的ParentQueue决定,而LeafQueue则交给下面的Application决定,Applactiion则根据container的优先级,决定哪个container被抢占。
从以上代码的整体逻辑可以看到,yarn进行资源抢占,在计算需要抢占多少资源的时候,是从整个yarn集群的范围内进行计算的,而不是为了满足某一个application的资源而为了它进行单独的抢占。