大数据理论与实践 源码分析 YARN资源调度策略

YARN资源调度策略

对于YARN的介绍,可以参考之前的文章:
大数据理论与实践4 分布式资源管理系统YARN

根据官方文档,YARN支持了三种资源调度策略,分别是:

  • FIFO调度器 FIFO Scheduler
  • 容量调度器 Capacity Scheduler
  • 公平调度器 Fair Scheduler

下面分别来看看这几个调度器是如何具体实现的。源码来自Hadoop3.3.1

调度流程

大数据理论与实践 源码分析 YARN资源调度策略
图源:【Yarn源码分析】FairScheduler资源调度-心跳调度
在需要进行资源调度时(心跳调度),会在心跳中发送NODE_UPDATE事件,此时调用调度器的算法来分配Container资源。
在调度器中的handle()函数会进行各种事件对应的处理:

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler
* .fifo FifoScheduler.java
* .capacity CapacityScheduler.java
* .fair FairScheduler.java
*/
public void handle(SchedulerEvent event) {
    long start = getClock().getTime();
    switch (event.getType()) {
    case NODE_ADDED:  // 省略
    case NODE_REMOVED: // 省略
    case NODE_UPDATE: //执行资源调度逻辑
    case APP_ADDED: // 省略
    case APP_REMOVED: // 省略
    case NODE_RESOURCE_UPDATE: // 省略
    case APP_ATTEMPT_ADDED: // 省略
    case APP_ATTEMPT_REMOVED: // 省略
    case CONTAINER_EXPIRED: // 省略
    case CONTAINER_RESCHEDULED: // 省略
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

也就是说在分析源码时要重点关注handle()中的NODE_UPDATE下的逻辑。

容量调度器

概述

大数据理论与实践 源码分析 YARN资源调度策略

容量调度器CapacityScheduler是一个可插拔的Hadoop调度程序,它允许多租户安全地共享一个大型集群,以便他们的应用程序在分配容量的限制下及时分配资源。
核心思想:提前做预算,在预算指导下分享集群资源。
调度策略

  • 集群资源由多个队列分享
  • 每个队列都要预设资源分配比例(提前做预算,预算是指导原则)
  • 空闲资源优先分配给“实际资源/预算资源”比值最低的队列(保持弹性)
  • 队列内部采用FIFO调度策略

特点(来自Hadoop3.3.1文档)

  1. 分层队列:支持队列分层结构,以确保在允许其他队列使用空闲资源之前在组织的子队列之间共享资源
  2. 容量保证:每个队列都要预设资源占比,防止资源独占
  3. 安全性:每个队列都有严格的 ACL。还有一些安全措施可确保用户无法查看和/或修改其他用户的应用程序。此外,还支持每个队列和系统管理员角色。
  4. 弹性:空闲资源可以分配给任何队列,但当多个队列争用时,会按比例进行平衡,同时也支持抢占
  5. 多租户:多用户共享集群资源,防止单个应用程序、用户和队列独占队列或整个集群的资源
  6. 可操作性:
    支持运行时配置,管理员可以在运行时以安全的方式更改队列定义和属性(例如容量、ACL);支持随时停止队列,以确保在现有应用程序运行完成时,不会提交新应用程序(现有的应用程序继续完成)。
  7. 基于资源的调度:支持资源密集型应用程序,其中应用程序可以选择指定比默认值更高的资源需求
  8. 基于默认或用户自定义的分配规则的队列映射接口
  9. 绝对资源配置:管理员可以为队列指定绝对资源,而不是提供基于百分比的值
  10. 子队列的动态自动创建和管理:支持自动创建子队列

简单来说,就是事先划定好预算比例,例如现在拥有计算资源100个单位,维护两个用户的队列A和B,队列A都是比较核心的业务,队列B是不怎么急的业务,那么就可以事先指定好队列A百分比为80%,队列B为20%,那么假如现在有40个队列A业务正在进行,5个队列B的业务正在运行,这时候来了一个A业务一个B业务,那么,按照当前比例40/80和5/20,应该优先给队列B提交的这个业务分配资源。

源码分析

那么看一下handle()函数对于NODE_UPDATE的处理逻辑。

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* CapacityScheduler.java
* handle()函数中
*/
case NODE_UPDATE:
{
	NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
    nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;

此处调用了nodeUpdate()函数,负责执行调度算法的逻辑:

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* CapacityScheduler.java
*/
protected void nodeUpdate(RMNode rmNode) {
    long begin = System.nanoTime();
    readLock.lock();
    try {
    	//设置最近修改时间
      	setLastNodeUpdateTime(Time.now());
      	super.nodeUpdate(rmNode);
    } finally {
      	readLock.unlock();
    }
    // Try to do scheduling
    // 资源调度
    // 如果没有启用异步调度
    if (!scheduleAsynchronously) {
      writeLock.lock();
      try {
        // reset allocation and reservation stats before we start doing any work
        // 在开始做任何工作之前,更新分配和现有资源信息
        updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
            CSAssignment.NULL_ASSIGNMENT);
		// 核心调度算法:分配Container
        allocateContainersToNode(rmNode.getNodeID(), true);
      } finally {
        writeLock.unlock();
      }
    }
    long latency = System.nanoTime() - begin;
    CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency);
  }

在核心调度算法allocateContainersToNode()中:

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* CapacityScheduler.java
*/
private void allocateContainersToNode(NodeId nodeId,
      boolean withNodeHeartbeat) {
    // 用于FIFO调度和Capacity调度的Node
    // 里面是一个Map<ContainerId, RMContainer>
    // 记录所有可用的Container
    FiCaSchedulerNode node = getNode(nodeId);
    if (null != node) {
      int offswitchCount = 0;
      int assignedContainers = 0;
      //获取到所有候选Container
      CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
          node);
      // 执行Containers分配
      CSAssignment assignment = allocateContainersToNode(candidates,
          withNodeHeartbeat);
	  // ...
      }
    }
  }

这里调用了allocateContainersToNode()函数,在Hadoop3.3.1源码中写,它提供了两种分配方式:单一节点分配和多节点分配。但是,在Hadoop官方文档中,没有yarn.scheduler.capacity.multi-node-placement-enabled(默认false)这一配置,因此这里应该是执行单一节点分配的逻辑。可能是多分配是未来会提供的功能。

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* CapacityScheduler.java
*/
CSAssignment allocateContainersToNode(
      CandidateNodeSet<FiCaSchedulerNode> candidates,
      boolean withNodeHeartbeat) {
    // 检查Scheduler是否准备好,因为Scheduler是可插拔的
    if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
        .isSchedulerReadyForAllocatingContainers()) {
      return null;
    }
    long startTime = System.nanoTime();
    // Backward compatible way to make sure previous behavior which allocation
    // driven by node heartbeat works.
    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
    // We have two different logics to handle allocation on single node / multi nodes.
    // 提供两种分配方式,单一节点分配和多节点分配
    // 在Hadoop3.3.1官方文档中,没有配置yarn.scheduler.capacity.multi-node-placement-enabled
    // 仅看单一节点分配
    CSAssignment assignment;
    if (!multiNodePlacementEnabled) {
      ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
          node.getNodeID());
      assignment = allocateContainerOnSingleNode(candidates,
          node, withNodeHeartbeat);
      ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
          node.getNodeID(), candidates.getPartition());
    } else{
      // 新的行为
    }

    if (assignment != null && assignment.getAssignmentInformation() != null
        && assignment.getAssignmentInformation().getNumAllocations() > 0) {
      long allocateTime = System.nanoTime() - startTime;
      CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime);
    }
    return assignment;
  }

这里执行了allocateContainerOnSingleNode()函数,分配Container到单一容器

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* CapacityScheduler.java
* 这里主要是进行预留资源的检查
*/
  /*
   * Logics of allocate container on a single node (Old behavior)
   */
  private CSAssignment allocateContainerOnSingleNode(
      CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
      boolean withNodeHeartbeat) {
    LOG.debug("Trying to schedule on node: {}, available: {}",
        node.getNodeName(), node.getUnallocatedResource())
	//判断node是否还在
    if (getNode(node.getNodeID()) != node) {
      //此处报错,写日志(省略)
      return null;
    }
    // Assign new containers...
    // 1. Check for reserved applications
    // 2. Schedule if there are no reservations
    // 如果Application提前预留了资源,可以直接分配
    // 这里涉及了YARN Reservation System
    RMContainer reservedContainer = node.getReservedContainer();
    if (reservedContainer != null) {
      //此处报错,写日志(省略)
      return null;
    }
    // First check if we can schedule
    // When this time look at one node only, try schedule if the node
    // has any available or killable resource
    // 首先检查能不能分配
    // 如果空闲的的以及可以kill掉的node数目是0,报错
    if (calculator.computeAvailableContainers(Resources
            .add(node.getUnallocatedResource(), node.getTotalKillableResources()),
        minimumAllocation) <= 0) {
	  //此处报错,写日志(省略)
      return null;
    }
	// 调用allocateOrReserveNewContainers()
    return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
  }
private CSAssignment allocateOrReserveNewContainers(
      CandidateNodeSet<FiCaSchedulerNode> candidates,
      boolean withNodeHeartbeat) {
    //调用assignContainers()执行实际分配
    CSAssignment assignment = getRootQueue().assignContainers(
        getClusterResource(), candidates, new ResourceLimits(labelManager
            .getResourceByLabel(candidates.getPartition(),
                getClusterResource())),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

	assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    submitResourceCommitRequest(getClusterResource(), assignment);
	//...
    return assignment;
  }

这里调用了assignContainers()函数,它的调用了assignContainersToChildQueues()函数来将Container分配给某个具体的作业队列。

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
* ParentQueue.java
*/
private CSAssignment assignContainersToChildQueues(Resource cluster,
      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
      SchedulingMode schedulingMode) {
    // ...
    // 尝试分配给最“under-serverd”的那个队列
    // sortAndGetChildrenAllocationIterator()获取到了按照资源占预先分配比例最小的list
    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
        candidates.getPartition()); iter.hasNext(); ) {
      CSQueue childQueue = iter.next();
      LOG.debug("Trying to assign to queue: {} stats: {}",
          childQueue.getQueuePath(), childQueue);
      // 分配之前获取到队列的ResourceLimits
      ResourceLimits childLimits =
          getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
              candidates.getPartition());

      CSAssignment childAssignment = childQueue.assignContainers(cluster,
          candidates, childLimits, schedulingMode);
      if(LOG.isDebugEnabled()) {
        LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
            " stats: " + childQueue + " --> " +
            childAssignment.getResource() + ", " + childAssignment.getType());
      }
	  // 如果这个队列分配的大于0,完成分配
      if (Resources.greaterThan(
              resourceCalculator, cluster, 
              childAssignment.getResource(), Resources.none())) {
        assignment = childAssignment;
        break;
      } else if {
      //...
    return assignment;
  }

这样就完成了分配,得到了这个assignment。

FIFO调度器

先入先出调度器是YARN最原始的调度器。
大数据理论与实践 源码分析 YARN资源调度策略
做法是将所有作业放入一个队列,先进队列的先获得资源,排在后面的作业只能等待。

源码分析

和capacity类似,我们只需要看nodeUpdate()函数即可。

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
* FifoScheduler.java
*/
protected synchronized void nodeUpdate(RMNode nm) {
    super.nodeUpdate(nm);
    FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID());
	//...
    // A decommissioned node might be removed before we get here
    if (node != null &&
        Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
            node.getUnallocatedResource(), minimumAllocation)) {
      //...
      //分配的核心函数
      assignContainers(node);
	  //...
    }
    updateAvailableResourcesMetrics();
  }

此处调用了核心的分配函数assignContainers()

/*
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
* FifoScheduler.java
*/
private void assignContainers(FiCaSchedulerNode node) {
    // Try to assign containers to applications in fifo order
    // 遍历所有application
    for (Map.Entry<ApplicationId, SchedulerApplication<FifoAppAttempt>> e : applications
        .entrySet()) {
      //获取到当前的Request
      FifoAppAttempt application = e.getValue().getCurrentAppAttempt();
      if (application == null) {
        continue;
      }
      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // 检查分配的资源是不是在黑名单上(经常失败的会上黑名单)
        if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
          continue;
        }
		//优先级检查
        for (SchedulerRequestKey schedulerKey :
            application.getSchedulerKeys()) {
            //...
        }
      }
      LOG.debug("post-assignContainers");
      application.showRequests()
      // Done
      // 这里调用DefaultResourceCalculator,对比内存够不够用
      if (Resources.lessThan(resourceCalculator, getClusterResource(),
              node.getUnallocatedResource(), minimumAllocation)) {
        break;
      }
    }
    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    for (SchedulerApplication<FifoAppAttempt> application : applications.values()) {
      FifoAppAttempt attempt =
          (FifoAppAttempt) application.getCurrentAppAttempt();
      if (attempt == null) {
        continue;
      }
      updateAppHeadRoom(attempt);
    }
  }

可以看出就是每次取最前面的一个application,尝试分配给他资源,如果可以分配就继续分配直到无法分配或等待队列是空的。

公平调度器

简述

公平调度是一种将资源分配给应用程序的方法,以便所有应用程序在一段时间内平均获得相等的资源份额。默认情况下,Fair Scheduler 仅基于内存进行调度公平性决策。
大数据理论与实践 源码分析 YARN资源调度策略
核心思想通过平分的方式,动态分配资源,无需预先设定资源分配比例,即**“不提前做预算、见面分一半、实现绝对公平”**。
公平调度器支持分层队列。所有队列都来root队列。可用资源以典型的公平调度方式分布在根队列的子队列中。然后,子队列以同样的方式将分配给他们的资源分配给他们子队列。应用程序只能在叶子队列上调度。
当提交其他应用程序时,释放的资源会分配给新应用程序,以便每个应用程序最终获得大致相同数量的资源。与形成应用程序队列的默认Hadoop调度程序不同,这可以让短应用程序在合理的时间内完成,同时不会使长寿命应用程序挨饿。
公平的共享也适用于应用程序优先级,来确定每个应用程序应获得的总资源的比例。
默认下允许所有应用程序运行,但也可以通过配置文件限制每个用户和每个队列运行的应用程序数量。

源码阅读

还是看nodeUpdate()函数。

/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FairScheduler.java
*/
  protected void nodeUpdate(RMNode nm) {
    writeLock.lock();
    try {
      long start = getClock().getTime();
      super.nodeUpdate(nm);
	  // 核心调度入口attemptScheduling(fsNode)方法
      FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID());
      attemptScheduling(fsNode);
      long duration = getClock().getTime() - start;
      fsOpDurations.addNodeUpdateDuration(duration);
    } finally {
      writeLock.unlock();
    }
  }

核心调度函数:

/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FairScheduler.java
*/
void attemptScheduling(FSSchedulerNode node) {
    writeLock.lock();
    try {
      if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
          .isSchedulerReadyForAllocatingContainers()) {
        return;
      }

      final NodeId nodeID = (node != null ? node.getNodeID() : null);
      if (!nodeTracker.exists(nodeID)) {
        // The node might have just been removed while this thread was waiting
        // on the synchronized lock before it entered this synchronized method
        LOG.info(
            "Skipping scheduling as the node " + nodeID + " has been removed");
        return;
      }
      // Assign new containers...
      // 1. 确保将容器分配给抢占的应用程序
      // 2. 检查预留的(reserved)的应用程序
      // 3. 如果没有预留,就分配
      // 应用程序可能会等待抢占的容器
      // 防止出现A从B中抢占Container,结果被不满足抢占条件的C给占了
      assignPreemptedContainers(node);
      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
      boolean validReservation = false;
      if (reservedAppSchedulable != null) {
        validReservation = reservedAppSchedulable.assignReservedContainer(node);
      }
      if (!validReservation) {
        // No reservation, schedule at queue which is farthest below fair share
        int assignedContainers = 0;
        Resource assignedResource = Resources.clone(Resources.none());
        Resource maxResourcesToAssign = Resources.multiply(
            node.getUnallocatedResource(), 0.5f);
        while (node.getReservedContainer() == null) {
          //分配 Container 是从 ROOT 队列开始,调用 queueMgr.getRootQueue() 方法找到 ROOT 队列
          //然后调用 assignContainer(node) 方法
          Resource assignment = queueMgr.getRootQueue().assignContainer(node);

          if (assignment.equals(Resources.none())) {
            LOG.debug("No container is allocated on node {}", node);
            break;
          }
          assignedContainers++;
          Resources.addTo(assignedResource, assignment);
          if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
              assignedResource)) {
            break;
          }
        }
      }
      updateRootQueueMetrics();
    } finally {
      writeLock.unlock();
    }
  }

分配 Container 是从 ROOT 队列开始,调用 queueMgr.getRootQueue() 方法找到 ROOT 队列,然后调用 assignContainer(node) 方法

/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FSParentQueue.java
*/
public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();
    // 如果超过了队列的 maxShare 则直接返回
    if (!assignContainerPreCheck(node)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Assign container precheck for queue " + getName() +
            " on node " + node.getNodeName() + " failed");
      }
      return assigned;
    }
	//单个条目未被锁定,并且可以更改,这意味着无法通过调用Sort()对子队列集合进行排序。
	//锁定每个子队列以防止更改会对性能产生很大影响。
	//不必处理队列移除的情况,因为在移除之前队列必须是空的。
	//将应用程序分配到队列和删除该队列都需要调度程序锁。
    TreeSet<FSQueue> sortedChildQueues = new TreeSet<>(policy.getComparator());
    readLock.lock();
    try {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Node " + node.getNodeName() + " offered to parent queue: " +
            getName() + " visiting " + childQueues.size() + " children");
      }
      sortedChildQueues.addAll(childQueues);
      for (FSQueue child : sortedChildQueues) {
        assigned = child.assignContainer(node);
        if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
      }
    } finally {
      readLock.unlock();
    }
    return assigned;
  }

这里是 FSParentQueue 父队列的 assignContainer() 逻辑,对所有孩子节点进行遍历,递归调用该该方法,调用过程有两种情况:

  • 如果孩子节点是 FSParentQueue 父队列,则递归进入 FSParentQueue 类相同的逻辑中。
  • 如果孩子节点是 FSLeafQueue 叶子队列,则进入到下一步的调用逻辑。
/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FSLeafQueue.java
* 孩子节点是FSLeafQueue叶子队列
*/
public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = none();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
          getName() + " fairShare: " + getFairShare());
    }
	// 检查是否超过队列的 maxShare 限制
    if (!assignContainerPreCheck(node)) {
      return assigned;
    }
	// 遍历叶子节点所有有资源需求的APP,并对其尝试分配Container
	// fetchAppsWithDemand()找到真正有资源需求的APP,过滤掉没有资源需求的APP
    for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
      // 黑名单,跳过
      if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
        continue;
      }
      assigned = sched.assignContainer(node);
      if (!assigned.equals(none())) {
        LOG.debug("Assigned container in queue:{} container:{}",
            getName(), assigned);
        break;
      }
    }
    return assigned;
  }

这里获取叶子节点的 APP 调用了 fetchAppsWithDemand() 方法,该方法主要是对该队列所有 APP 进行遍历,找到真正有资源需求的 APP,过滤掉没有资源的 APP。
获取到叶子节点有资源需求的 APP 后,调用 FSAppAttempt 类的实例 assignContainer(node) 方法,进行接下来的分配逻辑:

/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FSAppAttempt.java
* 主要检查队列已使用资源是否达到了用于运行AM的资源限制
*/
public Resource assignContainer(FSSchedulerNode node) {
	// 这里主要检查队列已使用资源是否达到了用于运行AM的资源限制
    if (isOverAMShareLimit()) {
      PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
      updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
          " exceeds maximum AM resource allowed).");
      if (LOG.isDebugEnabled()) {
        LOG.debug("AM resource request: " + amAsk.getPerAllocationResource()
            + " exceeds maximum AM resource allowed, "
            + getQueue().dumpState());
      }
      return Resources.none();
    }
    return assignContainer(node, false);
  }

这里主要检查队列已使用资源是否达到了用于运行 AM 的资源限制,如果没有的话,则继续调度

private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
    }
	//对APP的所有ResourceRequest按照priority排序
    Collection<SchedulerRequestKey> keysToTry = (reserved) ?
        Collections.singletonList(
            node.getReservedContainer().getReservedSchedulerKey()) :
        getSchedulerKeys();

    // For each priority, see if we can schedule a node local, rack local
    // or off-switch request. Rack of off-switch requests may be delayed
    // (not scheduled) in order to promote better locality.
    writeLock.lock();
    try {
      // TODO (wandga): All logics in this method should be added to
      // SchedulerPlacement#canDelayTo which is independent from scheduler.
      // Scheduler can choose to use various/pluggable delay-scheduling
      // implementation.
      // 按照 priority 从高到低遍历所有 ResourceRequest
      for (SchedulerRequestKey schedulerKey : keysToTry) {
        // 判断该 Container 是否有预留
        // hasContainerForNode() 会分 node、rack、any 三种情况考虑该节点是否有合适的 Container
        if (!reserved && !hasContainerForNode(schedulerKey, node)) {
          continue;
        }
		// 调度机会计数加1
        addSchedulingOpportunity(schedulerKey);
		// 下面的逻辑主要根据NODE_LOCAL、RACK_LOCAL、OFF_SWITCH 三种情况判断该ResourceRequest满足哪一种调度方式
        PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getRackName());
        PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getNodeName());
        if (nodeLocalPendingAsk.getCount() > 0
            && !appSchedulingInfo.canDelayTo(schedulerKey,
            node.getNodeName())) {
          LOG.warn("Relax locality off is not supported on local request: "
              + nodeLocalPendingAsk);
        }
		//省略三种情况的具体选择逻辑...
    return Resources.none();
  }

上面这段代码,主要是按照 priority 从高到低的顺序遍历所有的 ResourceRequest,针对每个 ResourceRequest,在待分配的node节点上,根据NODE_LOCAL、RACK_LOCAL、OFF_SWITCH三种情况判断该 ResourceRequest 满足哪一种调度方式,这里以NODE_LOCAL参数为例进入到下一步的调度逻辑。

/*
* org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair
* FSAppAttempt.java
* 为此节点分配一个容器
* 如果节点没有足够的内存,会创建一条预定信息(reservation)。一旦确定该节点应改被分配给特定请求,就会调用该函数
*/
private Resource assignContainer(
      FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
      boolean reserved, SchedulerRequestKey schedulerKey) {
    // 需要多少资源?
    Resource capability = pendingAsk.getPerAllocationResource();
    // 有多少资源?
    Resource available = node.getUnallocatedResource();

    Container reservedContainer = null;
    if (reserved) {
      reservedContainer = node.getReservedContainer().getContainer();
    }
    // available的资源够capability用吗?
    if (Resources.fitsIn(capability, available)) {
      // 如果资源够用,为这个request分配Container
      RMContainer allocatedContainer =
          allocate(type, node, schedulerKey, pendingAsk,
              reservedContainer);
      if (allocatedContainer == null) {
        // application是否需要此资源?
        if (reserved) {
          unreserve(schedulerKey, node);
        }
        LOG.debug("Resource ask {} fits in available node resources {},"
            + " but no container was allocated", capability, available);
        return Resources.none();
      }
      // 如果之前有资源预定reservation,删除
      if (reserved) {
        unreserve(schedulerKey, node);
      }
      // 通知node记录该分配出来的Container
      node.allocateContainer(allocatedContainer);
      // 如果不是非托管运行,我们分配的第一个容器总是AM。
      //设置此应用的AM资源并更新叶队列的AM使用情况
      if (!isAmRunning() && !getUnmanagedAM()) {
        setAMResource(capability);
        getQueue().addAMResourceUsage(capability);
        setAmRunning(true);
      }
      return capability;
    }
  	  // 此处处理抢占下的资源保留,省略...
      return Resources.none();
    }
  }

里面调用了allocate()方法:

public RMContainer allocate(NodeType type, FSSchedulerNode node,
      SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
      Container reservedContainer) {
    RMContainer rmContainer;
    Container container;
    writeLock.lock();
    try {
      // 更新locality级别,省略...
      // Required sanity check - AM can call 'allocate' to update resource
      // request without locking the scheduler, hence we need to check
      if (getOutstandingAsksCount(schedulerKey) <= 0) {
        return null;
      }
      container = reservedContainer;
      if (container == null) {
      	//这里会具体创建一个 Container 实例:
        container = createContainer(node, pendingAsk.getPerAllocationResource(),
            schedulerKey);
      }
      // 用RMContainer记录新创建出来的Container实例
      rmContainer = new RMContainerImpl(container, schedulerKey,
          getApplicationAttemptId(), node.getNodeID(),
          appSchedulingInfo.getUser(), rmContext);
      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
      // 记录rmContainer
      //等待下次AM心跳发生时,会从这里把分配出来的Container带走
      addToNewlyAllocatedContainers(node, rmContainer);
      liveContainers.put(container.getId(), rmContainer);
      // Update consumption and track allocations
      ContainerRequest containerRequest = appSchedulingInfo.allocate(
            type, node, schedulerKey, rmContainer);
      this.attemptResourceUsage.incUsed(container.getResource());
      getQueue().incUsedResource(container.getResource());

      // Update resource requests related to "request" and store in RMContainer
      ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);

      // 发送Container的START事件,更新Container状态
      rmContainer.handle(
          new RMContainerEvent(container.getId(), RMContainerEventType.START));

      if (LOG.isDebugEnabled()) {
        LOG.debug("allocate: applicationAttemptId=" + container.getId()
            .getApplicationAttemptId() + " container=" + container.getId()
            + " host=" + container.getNodeId().getHost() + " type=" + type);
      }
      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
          "SchedulerApp", getApplicationId(), container.getId(),
          container.getResource(), getQueueName(), null);
    } finally {
      writeLock.unlock();
    }
    return rmContainer;
  }

至此,一个全新的 Container 已经分配出来了!

参考

Apache Hadoop YARN 官方文档
Apache Hadoop 3.3.1源码
【Yarn源码分析】FairScheduler资源调度 - 笨小康(这篇写的真的挺好,强烈推荐,学到很多!)


2021年11月22日23:48:04:其实容量调度器的代码没看太明白,一知半解,回头再补上吧。

上一篇:大数据学习——配置并启动集群/开启历史服务器和日志聚集


下一篇:Hadoop集群搭建