前言
在《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对推断执行技术的背景进行了介绍,并且在Hadoop集群上做了一些测试以验证mapreduce框架的推断执行。最后还从源码分析的角度解读了DefaultSpeculator的初始化和启动过程,其中涉及DefaultSpeculator的实例化、LegacyTaskRuntimeEstimator的实例化及初始化、处理SpeculatorEvent事件的SpeculatorEventDispatcher、DefaultSpeculator收到类型为JOB_CREATE的SpeculatorEvent事件时的处理等内容。经过以上过程,实际只是完成了DefaultSpeculator的初始化,那么DefaultSpeculator是什么时候真正开始进行任务推断的呢?
任务实例启动与推断
当ResourceManager已经为作业分配并启动一个Container用于执行MRAppMaster后,MRAppMaster会获取当前job的所有task,并为每一个task创建一个TaskAttemptImpl对象,此对象代表task的一次运行尝试。当此次尝试已经分配了Container并且启动其task的时候,TaskAttemptImpl将收到TaskAttemptEventType.TA_CONTAINER_LAUNCHED类型的事件,进而对TaskAttemptImpl对象的状态进行迁移(有关状态机的实现,请参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文),TaskAttemptImpl处理TaskAttemptEventType.TA_CONTAINER_LAUNCHED事件的相关迁移实现如下:
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
new LaunchedContainerTransition())
根据《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容,我们知道会最终调用LaunchedContainerTransition的transition方法,完成状态迁移,其实现逻辑中会
调用SpeculatorEvent的构造器,代码如下:
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
此处调用的SpeculatorEvent的构造器的实现如下:
public SpeculatorEvent(TaskAttemptId attemptID, boolean flag, long timestamp) {
super(Speculator.EventType.ATTEMPT_START, timestamp);
this.reportedStatus = new TaskAttemptStatus();
this.reportedStatus.id = attemptID;
this.taskID = attemptID.getTaskId();
}
可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.ATTEMPT_START。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.ATTEMPT_START类型的事件匹配的代码如下:
case ATTEMPT_START:
{
LOG.info("ATTEMPT_START " + event.getTaskID());
estimator.enrollAttempt
(event.getReportedStatus(), event.getTimestamp());
break;
}
以默认的LegacyTaskRuntimeEstimator为例,这里实际调用了LegacyTaskRuntimeEstimator的父类StartEndTimesBase的enrollAttempt方法,代码如下:
@Override
public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
startTimes.put(status.id,timestamp);
}
其中startTimes用于缓存TaskAttemptId与TaskAttemptImpl实例启动的时间之间的映射,startTimes的类型定义如下:
protected final Map<TaskAttemptId, Long> startTimes
= new ConcurrentHashMap<TaskAttemptId, Long>();
任务实例更新与推断
每当任务实例在运行过程中向MRAppMaster汇报信息时,TaskAttemptImpl对象将会收到TaskAttemptEventType.TA_UPDATE类型的事件,此时TaskAttemptImpl的状态机相关的代码如下:
// Transitions from RUNNING state.
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
// 省略其它与TaskAttemptEventType.TA_UPDATE无关的状态迁移代码
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
new StatusUpdater())
进而实际执行StatusUpdater的transition方法,其transition方法中与任务推断执行有关的代码实现如下:
// send event to speculator about the reported status
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
这里的SpeculatorEvent的构造器如下: public SpeculatorEvent(TaskAttemptStatus reportedStatus, long timestamp) {
super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
this.reportedStatus = reportedStatus;
}
可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.ATTEMPT_STATUS_UPDATE。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(一)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.ATTEMPT_STATUS_UPDATE类型的事件匹配的代码如下:
case ATTEMPT_STATUS_UPDATE:
statusUpdate(event.getReportedStatus(), event.getTimestamp());
break;
DefaultSpeculator的statusUpdate方法(见代码清单8)主要用于更新正在运行的任务(runningTasks缓存)、正在运行任务实例的历史统计信息(runningTaskAttemptStatistics缓存)并调用estimator的updateAttempt方法更新任务实例的状态信息。
protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
String stateString = reportedStatus.taskState.toString();
TaskAttemptId attemptID = reportedStatus.id;
TaskId taskID = attemptID.getTaskId();
Job job = context.getJob(taskID.getJobId());
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
estimator.updateAttempt(reportedStatus, timestamp);
if (stateString.equals(TaskAttemptState.RUNNING.name())) {
runningTasks.putIfAbsent(taskID, Boolean.TRUE);
} else {
runningTasks.remove(taskID, Boolean.TRUE);
if (!stateString.equals(TaskAttemptState.STARTING.name())) {
runningTaskAttemptStatistics.remove(attemptID);
}
}
}
以默认的estimator的实现LegacyTaskRuntimeEstimator为例,其updateAttempt方法的实现见代码清单9。
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
super.updateAttempt(status, timestamp);
TaskAttemptId attemptID = status.id;
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return;
}
Long boxedStart = startTimes.get(attemptID);
long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
// We need to do two things.
// 1: If this is a completion, we accumulate statistics in the superclass
// 2: If this is not a completion, we learn more about it.
// This is not a completion, but we're cooking.
//
if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
// See if this task is already in the registry
AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
AtomicLong estimateVarianceContainer
= attemptRuntimeEstimateVariances.get(taskAttempt);
if (estimateContainer == null) {
if (attemptRuntimeEstimates.get(taskAttempt) == null) {
attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
}
}
if (estimateVarianceContainer == null) {
attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
}
long estimate = -1;
long varianceEstimate = -1;
// This code assumes that we'll never consider starting a third
// speculative task attempt if two are already running for this task
if (start > 0 && timestamp > start) {
estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
varianceEstimate = (long) (estimate * status.progress / 10);
}
if (estimateContainer != null) {
estimateContainer.set(estimate);
}
if (estimateVarianceContainer != null) {
estimateVarianceContainer.set(varianceEstimate);
}
}
}
具体分析代码清单9前,先理解以下定义:- timestamp:本次状态更新的时间戳
- start:TaskAttemptImpl实例启动即分配Container尝试运行Task的开始时间
- status.progress:TaskAttemptImpl实例运行完成的进度值,是浮点数
因此从上面代码,我们不难看出任务实例运行需要的总时间的估值(estimate)和方差估值(varianceEstimate)的计算公式。
estimate = (timestamp - start)/status.progress
varianceEstimate = (timestamp - start)/10
estimateContainer和estimateVarianceContainer都是原子类型,分别用于保存估值(estimate)和方差估值(varianceEstimate)。
任务实例Container与推断
Container状态发生变化的场景有以下三种:
- 当MRAppMaster调度任务实例,并为之将要请求Container时;
- 为任务实例分配Container时;
- 任务实例的Container分配完成时。
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
RequestContainerTransition的transition方法中涉及推断的代码如下:
// Tell any speculator that we're requesting a container
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
TaskAttemptImpl的状态机中涉及为任务实例分配Container的代码如下: .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
TaskAttemptStateInternal.KILLED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
// 省略其它状态迁移代码
// Transitions from the ASSIGNED state.
// <span style="font-family: Arial, Helvetica, sans-serif;">省略其它状态迁移代码</span>
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
DeallocateContainerTransition的transition方法中涉及推断的代码如下: // send event to speculator that we withdraw our container needs, if
// we're transitioning out of UNASSIGNED
if (withdrawsContainerRequest) {
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
}
TaskAttemptImpl的状态机中涉及的任务实例的Container分配完成的代码如下: // Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition())
ContainerAssignedTransition的transition方法中涉及推断的代码如下:
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
以上三种状态迁移中都使用了SpeculatorEvent的同一个构造器,代码如下:
public SpeculatorEvent(TaskId taskID, int containersNeededChange) {
super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
this.taskID = taskID;
this.containersNeededChange = containersNeededChange;
}
可以看到这里构造的SpeculatorEvent事件的类型是Speculator.EventType.TASK_CONTAINER_NEED_UPDATE。根据《Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)》一文中对于SpeculatorEventDispatcher的handle方法的介绍,SpeculatorEvent事件最终交由DefaultSpeculator的processSpeculatorEvent方法处理。Speculator.EventType.TASK_CONTAINER_NEED_UPDATE类型的事件匹配的代码如下: case TASK_CONTAINER_NEED_UPDATE:
{
AtomicInteger need = containerNeed(event.getTaskID());
need.addAndGet(event.containersNeededChange());
break;
}
containerNeed方法(见代码清单10)用于获取当前作业的所有map或者reduce任务需要的Container数量。并将当前任务实例需要的资源数量(+1表示需要,-1表示释放)更新到当前作业的所有map或者reduce任务需要的Container数量中。 private AtomicInteger containerNeed(TaskId taskID) {
JobId jobID = taskID.getJobId();
TaskType taskType = taskID.getTaskType();
ConcurrentMap<JobId, AtomicInteger> relevantMap
= taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
AtomicInteger result = relevantMap.get(jobID);
if (result == null) {
relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
result = relevantMap.get(jobID);
}
return result;
}
总结
从以上分析可以看出map任务的推断执行主要为:启动任务实例时开启对任务实例的监控;根据任务实例在运行过程中向MRAppMaster汇报信息计算运行总时长的估值和方差估值;当任务实例由于推断执行需要分配新的Container时对任务需要的Container数量进行更新。
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:(现有满150送50活动)http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html