Hadoop2.6.0运行mapreduce之推断(speculative)执行(下)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51251757

前言

在《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对推断执行技术的背景进行了介绍,并且在Hadoop集群上做了一些测试以验证mapreduce框架的推断执行。最后还从源码分析的角度解读了DefaultSpeculator的初始化和启动过程,其中涉及DefaultSpeculator的实例化、LegacyTaskRuntimeEstimator的实例化及初始化、处理SpeculatorEvent事件的SpeculatorEventDispatcher、DefaultSpeculator收到类型为JOB_CREATE的SpeculatorEvent事件时的处理等内容。经过以上过程,实际只是完成了DefaultSpeculator的初始化,那么DefaultSpeculator是什么时候真正开始进行任务推断的呢?

任务实例启动与推断

当ResourceManager已经为作业分配并启动一个Container用于执行MRAppMaster后,MRAppMaster会获取当前job的所有task,并为每一个task创建一个TaskAttemptImpl对象,此对象代表task的一次运行尝试。当此次尝试已经分配了Container并且启动其task的时候,TaskAttemptImpl将收到TaskAttemptEventType.TA_CONTAINER_LAUNCHED类型的事件,进而对TaskAttemptImpl对象的状态进行迁移(有关状态机的实现,请参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文),TaskAttemptImpl处理TaskAttemptEventType.TA_CONTAINER_LAUNCHED事件的相关迁移实现如下:

     // Transitions from the ASSIGNED state.
     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
         TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
         new LaunchedContainerTransition())

根据《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容,我们知道会最终调用LaunchedContainerTransition的transition方法,完成状态迁移,其实现逻辑中会

调用SpeculatorEvent的构造器,代码如下:

      taskAttempt.eventHandler.handle
          (new SpeculatorEvent
              (taskAttempt.attemptId, true, taskAttempt.clock.getTime()));

此处调用的SpeculatorEvent的构造器的实现如下:

  public SpeculatorEvent(TaskAttemptId attemptID, boolean flag, long timestamp) {
    super(Speculator.EventType.ATTEMPT_START, timestamp);
    this.reportedStatus = new TaskAttemptStatus();
    this.reportedStatus.id = attemptID;
    this.taskID = attemptID.getTaskId();
  }

可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.ATTEMPT_START。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.ATTEMPT_START类型的事件匹配的代码如下:

      case ATTEMPT_START:
      {
        LOG.info("ATTEMPT_START " + event.getTaskID());
        estimator.enrollAttempt
            (event.getReportedStatus(), event.getTimestamp());
        break;
      }
以默认的LegacyTaskRuntimeEstimator为例,这里实际调用了LegacyTaskRuntimeEstimator的父类StartEndTimesBase的enrollAttempt方法,代码如下:

  @Override
  public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
    startTimes.put(status.id,timestamp);
  }
其中startTimes用于缓存TaskAttemptId与TaskAttemptImpl实例启动的时间之间的映射,startTimes的类型定义如下:

  protected final Map<TaskAttemptId, Long> startTimes
      = new ConcurrentHashMap<TaskAttemptId, Long>();

调用StartEndTimesBase的enrollAttempt方法的根本意义在于开启Estimator对此任务实例的监控。

任务实例更新与推断

每当任务实例在运行过程中向MRAppMaster汇报信息时,TaskAttemptImpl对象将会收到TaskAttemptEventType.TA_UPDATE类型的事件,此时TaskAttemptImpl的状态机相关的代码如下:

     // Transitions from RUNNING state.
     .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
         TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
     // 省略其它与TaskAttemptEventType.TA_UPDATE无关的状态迁移代码
     // Transitions from COMMIT_PENDING state
     .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
         TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
         new StatusUpdater())
进而实际执行StatusUpdater的transition方法,其transition方法中与任务推断执行有关的代码实现如下:

      // send event to speculator about the reported status
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent
              (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
这里的SpeculatorEvent的构造器如下:
  public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
    super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
    this.reportedStatus = reportedStatus;
  }
可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.ATTEMPT_STATUS_UPDATE。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(一)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.ATTEMPT_STATUS_UPDATE类型的事件匹配的代码如下:

      case ATTEMPT_STATUS_UPDATE:
        statusUpdate(event.getReportedStatus(), event.getTimestamp());
        break;

DefaultSpeculator的statusUpdate方法(见代码清单8)主要用于更新正在运行的任务(runningTasks缓存)、正在运行任务实例的历史统计信息(runningTaskAttemptStatistics缓存)并调用estimator的updateAttempt方法更新任务实例的状态信息。

代码清单8 更新任务实例的状态信息

  protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {

    String stateString = reportedStatus.taskState.toString();

    TaskAttemptId attemptID = reportedStatus.id;
    TaskId taskID = attemptID.getTaskId();
    Job job = context.getJob(taskID.getJobId());

    if (job == null) {
      return;
    }

    Task task = job.getTask(taskID);

    if (task == null) {
      return;
    }

    estimator.updateAttempt(reportedStatus, timestamp);

    if (stateString.equals(TaskAttemptState.RUNNING.name())) {
      runningTasks.putIfAbsent(taskID, Boolean.TRUE);
    } else {
      runningTasks.remove(taskID, Boolean.TRUE);
      if (!stateString.equals(TaskAttemptState.STARTING.name())) {
        runningTaskAttemptStatistics.remove(attemptID);
      }
    }
  }

以默认的estimator的实现LegacyTaskRuntimeEstimator为例,其updateAttempt方法的实现见代码清单9。

代码清单9 更新任务实例的状态信息

  @Override
  public void updateAttempt(TaskAttemptStatus status, long timestamp) {
    super.updateAttempt(status, timestamp);
    

    TaskAttemptId attemptID = status.id;
    TaskId taskID = attemptID.getTaskId();
    JobId jobID = taskID.getJobId();
    Job job = context.getJob(jobID);

    if (job == null) {
      return;
    }

    Task task = job.getTask(taskID);

    if (task == null) {
      return;
    }

    TaskAttempt taskAttempt = task.getAttempt(attemptID);

    if (taskAttempt == null) {
      return;
    }

    Long boxedStart = startTimes.get(attemptID);
    long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;

    // We need to do two things.
    //  1: If this is a completion, we accumulate statistics in the superclass
    //  2: If this is not a completion, we learn more about it.

    // This is not a completion, but we're cooking.
    //
    if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
      // See if this task is already in the registry
      AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
      AtomicLong estimateVarianceContainer
          = attemptRuntimeEstimateVariances.get(taskAttempt);

      if (estimateContainer == null) {
        if (attemptRuntimeEstimates.get(taskAttempt) == null) {
          attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());

          estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
        }
      }

      if (estimateVarianceContainer == null) {
        attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
        estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
      }


      long estimate = -1;
      long varianceEstimate = -1;

      // This code assumes that we'll never consider starting a third
      //  speculative task attempt if two are already running for this task
      if (start > 0 && timestamp > start) {
        estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
        varianceEstimate = (long) (estimate * status.progress / 10);
      }
      if (estimateContainer != null) {
        estimateContainer.set(estimate);
      }
      if (estimateVarianceContainer != null) {
        estimateVarianceContainer.set(varianceEstimate);
      }
    }
  }
具体分析代码清单9前,先理解以下定义:
  • timestamp:本次状态更新的时间戳
  • start:TaskAttemptImpl实例启动即分配Container尝试运行Task的开始时间
  • status.progress:TaskAttemptImpl实例运行完成的进度值,是浮点数

因此从上面代码,我们不难看出任务实例运行需要的总时间的估值(estimate)和方差估值(varianceEstimate)的计算公式。

estimate = (timestamp - start)/status.progress

varianceEstimate = (timestamp - start)/10

estimateContainer和estimateVarianceContainer都是原子类型,分别用于保存估值(estimate)和方差估值(varianceEstimate)。


任务实例Container与推断

Container状态发生变化的场景有以下三种:

  • 当MRAppMaster调度任务实例,并为之将要请求Container时;
  • 为任务实例分配Container时;
  • 任务实例的Container分配完成时。
TaskAttemptImpl的状态机中涉及将要为任务实例请求Container的代码如下:

     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
RequestContainerTransition的transition方法中涉及推断的代码如下:

      // Tell any speculator that we're requesting a container
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
TaskAttemptImpl的状态机中涉及为任务实例分配Container的代码如下:
     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
         TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
             TaskAttemptStateInternal.KILLED, true))
     .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
         TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
             TaskAttemptStateInternal.FAILED, true))
     // 省略其它状态迁移代码
     // Transitions from the ASSIGNED state.
     // <span style="font-family: Arial, Helvetica, sans-serif;">省略其它状态迁移代码</span>
     .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
         TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
         new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
DeallocateContainerTransition的transition方法中涉及推断的代码如下:
      // send event to speculator that we withdraw our container needs, if
      //  we're transitioning out of UNASSIGNED
      if (withdrawsContainerRequest) {
        taskAttempt.eventHandler.handle
            (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
      }
TaskAttemptImpl的状态机中涉及的任务实例的Container分配完成的代码如下:
     // Transitions from the UNASSIGNED state.
     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
         new ContainerAssignedTransition())

ContainerAssignedTransition的transition方法中涉及推断的代码如下:

      // send event to speculator that our container needs are satisfied
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
以上三种状态迁移中都使用了SpeculatorEvent的同一个构造器,代码如下:

  public SpeculatorEvent(TaskId taskID, int containersNeededChange) {
    super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
    this.taskID = taskID;
    this.containersNeededChange = containersNeededChange;
  }
可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.TASK_CONTAINER_NEED_UPDATE。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.TASK_CONTAINER_NEED_UPDATE类型的事件匹配的代码如下:
      case TASK_CONTAINER_NEED_UPDATE:
      {
        AtomicInteger need = containerNeed(event.getTaskID());
        need.addAndGet(event.containersNeededChange());
        break;
      }
containerNeed方法(见代码清单10)用于获取当前作业的所有map或者reduce任务需要的Container数量。并将当前任务实例需要的资源数量(+1表示需要,-1表示释放)更新到当前作业的所有map或者reduce任务需要的Container数量中。

代码清单10 获取当前作业的所有map或者reduce任务需要的Container数量
  private AtomicInteger containerNeed(TaskId taskID) {
    JobId jobID = taskID.getJobId();
    TaskType taskType = taskID.getTaskType();

    ConcurrentMap<JobId, AtomicInteger> relevantMap
        = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;

    AtomicInteger result = relevantMap.get(jobID);

    if (result == null) {
      relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
      result = relevantMap.get(jobID);
    }

    return result;
  }

总结

从以上分析可以看出map任务的推断执行主要为:启动任务实例时开启对任务实例的监控;根据任务实例在运行过程中向MRAppMaster汇报信息计算运行总时长的估值和方差估值;当任务实例由于推断执行需要分配新的Container时对任务需要的Container数量进行更新。


后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。


Hadoop2.6.0运行mapreduce之推断(speculative)执行(下)

京东:(现有满150送50活动)http://item.jd.com/11846120.html 

当当:http://product.dangdang.com/23838168.html 



上一篇:DELPHI控件具有多个属性的动态设置方法


下一篇:Hadoop2.6.0配置参数查看小工具