FairScheduler的任务调度机制——assignTasks(续)

上一篇文章浅析了FairScheduler的assignTasks()方法,介绍了FairScheduler任务调度的原理。略过了最后一步通过JobScheduler获取Task时调用JobInProgress的五个方法:obtainNewNodeLocalMapTask(),obtainNewNodeOrRackLocalMapTask(),obtainNewMapTask(),obtainNewReduceTask()。这篇文章将对这四个方法进行简单的源代码解析。obtainNewNodeLocalMapTask(),obtainNewNodeOrRackLocalMapTask(),obtainNewMapTask()这三个方法都是选择一个Map任务,其内部调用的方法也是一样的,都是obtainNewMapTaskCommon()方法,不同的只是maxCacheLevel参数值不同:

obtainNewNodeLocalMapTask():maxCacheLevel==1;

obtainNewNodeOrRackLocalMapTask:maxCacheLevel==maxLevel;

obtainNewMapTask:maxCacheLevel==anyCacheLevel(maxLevel+1)。

下面着重分析obtainNewMapTaskCommon()方法。

1.JobInProgress.obtainNewMapTaskCommon():

public synchronized Task obtainNewMapTaskCommon(
TaskTrackerStatus tts, int clusterSize, int numUniqueHosts,
int maxCacheLevel) throws IOException {
if (!tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobID());
try { throw new IOException("state = " + status.getRunState()); }
catch (IOException ioe) {ioe.printStackTrace();}
return null;
} int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel,
status.mapProgress());
if (target == -1) {
return null;
} Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
// DO NOT reset for off-switch!
if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
resetSchedulingOpportunities();
}
}
return result;
}

首先判断Job是否初始化,即tasksInited是否为true,未初始化则不调度任务。接着调用findNewMapTask()方法获取一个新的Map任务,同时将maxCacheLevel参数传递过去,该参数的作用是选择不同LocalLevel的Map任务。下面看看findNewMapTask()方法。

2.JobInProgress.findNewMapTask():

首先介绍下该方法的返回值,该方法不是直接返回一个MapTask,而是返回一个maps[]数组的索引值,这个maps[]数组在Job初始化时创建,存放该Job所有的Map任务,根据索引值就可以知道对应的MapTask。

if (numMapTasks == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("No maps to schedule for " + profile.getJobID());
}
return -1;
}

首先判断该Job的Map任务数是否为0,numMapTasks是在Job进行初始化(initTasks()方法)时根据输入文件的分片数确定的,即一个Split对应一个Map任务。该值为0表示该Job没有Map任务,所以返回-1,即没有满足条件的Map任务。

String taskTracker = tts.getTrackerName();
TaskInProgress tip = null; //
// Update the last-known clusterSize
//
this.clusterSize = clusterSize; if (!shouldRunOnTaskTracker(taskTracker)) {
return -1;
}

判断是否能够在该TT上运行任务,主要根据该Job在该TT上是否有过失败的任务来判断。下面看看该方法。

3.JobInProgress.shouldRunOnTaskTracker():

private boolean shouldRunOnTaskTracker(String taskTracker) {
//
// Check if too many tasks of this job have failed on this
// tasktracker prior to assigning it a new one.
//
int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
if (LOG.isDebugEnabled()) {
String flakyTracker = convertTrackerNameToHostName(taskTracker);
LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
+ "' for assigning a new task");
}
return false;
}
return true;
}
  private int getTrackerTaskFailures(String trackerName) {
String trackerHostName = convertTrackerNameToHostName(trackerName);
Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
return (failedTasks != null) ? failedTasks.intValue() : 0;
}

该方法从Job中保存的trackerToFailuresMap队列中获取该TT上所有的失败任务数。提一下,trackerToFailuresMap队列信息也是在TT通过心跳向JT时更新的,即updateTaskStatus()方法。flakyTaskTrackers值记录该Job在多少个TT上面失败的任务数大于maxTaskFailuresPerTracker(即一个Job在一个TT上可允许失败的最大数),当一个Job在一个TT上拥有的失败任务数大于maxTaskFailuresPerTracker时则表示该Job不可再在该TT上执行任何任务,但是当一个Job在超过(clusterSize * CLUSTER_BLACKLIST_PERCENT)个TT上失败的话,则不去考虑该Job是否在该TT上失败,因为可能是Job自身的问题,而非单个TT的问题。总之该方法根据Job的失败任务信息来判断是否应该在一个TT上执行任务。

接着返回到JobInProgress.findNewMapTask()方法。

4.JobInProgress.findNewMapTask():

// Check to ensure this TaskTracker has enough resources to
// run tasks from this job
long outSize = resourceEstimator.getEstimatedMapOutputSize();
long availSpace = tts.getResourceStatus().getAvailableSpace();
if(availSpace < outSize) {
LOG.warn("No room for map task. Node " + tts.getHost() +
" has " + availSpace +
" bytes free; but we expect map to take " + outSize); return -1; //see if a different TIP might work better.
}

判断该TT是否有够该Job的Map任务使用的资源,主要是根据该Job已完成的Map任务的输出情况来估算一个Map任务可能的输出大小。

long getEstimatedMapOutputSize() {
long estimate = 0L;
if (job.desiredMaps() > 0) {
estimate = getEstimatedTotalMapOutputSize() / job.desiredMaps();
}
return estimate;
}
protected synchronized long getEstimatedTotalMapOutputSize()  {
if(completedMapsUpdates < threshholdToUse) {
return 0;
} else {
long inputSize = job.getInputLength() + job.desiredMaps();
//add desiredMaps() so that randomwriter case doesn't blow up
//the multiplication might lead to overflow, casting it with
//double prevents it
long estimate = Math.round(((double)inputSize *
completedMapsOutputSize * 2.0)/completedMapsInputSize);
if (LOG.isDebugEnabled()) {
LOG.debug("estimate total map output will be " + estimate);
}
return estimate;
}
}

具体估算方法是根据(该Job的输入大小/已完成的Map任务的输入大小)*(该Job已完成的所有Map任务的总输出大小)*2估算出该Job全部Map任务大概的输出大小,然后除以该Job的Map数量即一个Map任务的可能输出大小(至于这些值的跟新基本都是通过心跳通信)。如果TT上可使用的资源小于该Job一个Map任务可能的输出大小则不能在该TT上执行Map任务。

5.JobInProgress.findNewMapTask():

接下来就是该方法的关键部分,首先看下作者们对该部分的一个注释:

// When scheduling a map task:
// 0) Schedule a failed task without considering locality
// 1) Schedule non-running tasks
// 2) Schedule speculative tasks
// 3) Schedule tasks with no location information // First a look up is done on the non-running cache and on a miss, a look
// up is done on the running cache. The order for lookup within the cache:
// 1. from local node to root [bottom up]
// 2. breadth wise for all the parent nodes at max level
// We fall to linear scan of the list ((3) above) if we have misses in the
// above caches

第一部分主要是说明选择任务的顺序:失败的Task(不去考虑本地性),未运行的任务,推测执行的任务,输入文件没有对应的Location信息的任务。第二部分是说明在选择每个任务时对集群上所有节点的遍历方式:自下往上一次遍历以及从根节点横向遍历。

下面来看第一中选择方式:从失败的任务中选择。

 // 0) Schedule the task with the most failures, unless failure was on this
// machine
tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
scheduleMap(tip);
LOG.info("Choosing a failed task " + tip.getTIPId());
return tip.getIdWithinJob();
}

failedMaps中存放着所有的失败任务信息,直接调用findTaskFromList()方法从failedMaps中选择一个任务。下面三种方式也都是调用该方法,不同的只是传入的List不同,所以看下findTaskFromList()方法。

6.JobInProgress.findTaskFromList():

private synchronized TaskInProgress findTaskFromList(
Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
int numUniqueHosts,
boolean removeFailedTip) {
Iterator<TaskInProgress> iter = tips.iterator();
while (iter.hasNext()) {
TaskInProgress tip = iter.next(); // Select a tip if
// 1. runnable : still needs to be run and is not completed
// 2. ~running : no other node is running it
// 3. earlier attempt failed : has not failed on this host
// and has failed on all the other hosts
// A TIP is removed from the list if
// (1) this tip is scheduled
// (2) if the passed list is a level 0 (host) cache
// (3) when the TIP is non-schedulable (running, killed, complete)
if (tip.isRunnable() && !tip.isRunning()) {
// check if the tip has failed on this host
if (!tip.hasFailedOnMachine(ttStatus.getHost()) ||
tip.getNumberOfFailedMachines() >= numUniqueHosts) {
// check if the tip has failed on all the nodes
iter.remove();
return tip;
} else if (removeFailedTip) {
// the case where we want to remove a failed tip from the host cache
// point#3 in the TIP removal logic above
iter.remove();
}
} else {
// see point#3 in the comment above for TIP removal logic
iter.remove();
}
}
return null;
}

该方法的作用是从一个TaskInProgress列表中选择一个适合在TT上执行的Task》从代码中的注释可以看出选择的前提是Task是可运行的(!failed && (completes == 0),即未失败也未完成),且非正在运行中(no other node is running it),且该Task没有在该TT所在的HOST上有过失败任务(一个Task会存在多个TaskAttempt任务,TaskAttempt听名字就可以知道是一个Task的多次尝试执行,失败了就再来一次,再失败再来,直到超出一个限度才会标志这个Task失败),或者该Task的失败次数大于等于集群中所有的HOST数量(表示该Task在所有HOST都失败过),满足上面三个条件的Task即可返回。后面就是判断是否将该Task从队列中移除,注释给出了三种会移除的情况:该Task已被调度,即被选中;选择的Task的本地化等级是NODE;该Task处于不可运行状态(运行中,或者完成,或者被kill掉了)。了解了该方法的原理则后面的内容就简单了。下面回到JobInProgress.findNewMapTask()方法。

7.JobInProgress.findNewMapTask():

tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
scheduleMap(tip);
LOG.info("Choosing a failed task " + tip.getTIPId());
return tip.getIdWithinJob();
}

依旧看这段代码,当findTaskFromList()方法成功返回一个Task后,需要将该Task添加到运行中的队列去,

protected synchronized void scheduleMap(TaskInProgress tip) {

    if (runningMapCache == null) {
LOG.warn("Running cache for maps is missing!! "
+ "Job details are missing.");
return;
}
String[] splitLocations = tip.getSplitLocations(); // Add the TIP to the list of non-local running TIPs
if (splitLocations == null || splitLocations.length == 0) {
nonLocalRunningMaps.add(tip);
return;
} for(String host: splitLocations) {
Node node = jobtracker.getNode(host); for (int j = 0; j < maxLevel; ++j) {
Set<TaskInProgress> hostMaps = runningMapCache.get(node);
if (hostMaps == null) {
// create a cache if needed
hostMaps = new LinkedHashSet<TaskInProgress>();
runningMapCache.put(node, hostMaps);
}
hostMaps.add(tip);
node = node.getParent();
}
}
}

该方法主要将被选中的Task(这里任务都是Map任务)添加到Job中保存运行中任务信息的队列中(nonLocalRunningMaps和runningMapCache),nonLocalRunningMaps保存那些输入文件没有Location信息的Task,而runningMapCache则保存输入文件存在Location的Task。runningMapCache是一个Map,key是Node,而value是一个TaskInProgress对象的集合,说明该Map保持的是一个Node-->其上运行的所有Task的一个映射关系,这里的Node是拥有该Task的输入文件块的所有节点。当有一个Task需要添加到runningMapCache时不仅需要为其建立到Task的输入文件所在的所有Node到该Task的关系,而且分别为其建立输入文件所在Node的父节点到该Task的关系,循环知道遍历的深度等于maxLevel。这样做的好处是可以很方便的知道一个Node上运行的所有Task信息,包括其子节点上运行的Task。继续返回到JobInProgress.findNewMapTask()方法。

接下来就是简单地返回该Task在maps[]数组中的索引值。到这里第一种选择Task的方式完成了,下面看看后面几种方式。

8.JobInProgress.findNewMapTask():

Node node = jobtracker.getNode(tts.getHost());

获取该TT所在的HOST对应的Node对象。

// 1. check from local node to the root [bottom up cache lookup]
// i.e if the cache is available and the host has been resolved
// (node!=null)
if (node != null) {
Node key = node;
int level = 0;
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
// called to schedule any task (local, rack-local, off-switch or
// speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
// findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
// off-switch/speculative tasks
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
for (level = 0;level < maxLevelToSchedule; ++level) {
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
if (cacheForLevel != null) {
tip = findTaskFromList(cacheForLevel, tts,
numUniqueHosts,level == 0);
if (tip != null) {
// Add to running cache
scheduleMap(tip); // remove the cache if its empty
if (cacheForLevel.size() == 0) {
nonRunningMapCache.remove(key);
} return tip.getIdWithinJob();
}
}
key = key.getParent();
} // Check if we need to only schedule a local task (node-local/rack-local)
if (level == maxCacheLevel) {
return -1;
}
}

这一部分是从未运行的Map任务中选择一个可执行的Map任务。首先计算maxLevelToSchedule,该值是maxCacheLevel和maxLevel的较小的值,注释给出的解释是maxCacheLevel(调用该方法传入的参数值)可能会比该Job的maxLevel属性值大,所以选择两者之中最小的值作为选择的最大本地等级值(maxLevelToSchedule)。接下来就是自下往上寻找满足条件的Map任务,知道遍历深度达到maxLevelToSchedule。方法较简单,只是从nonRunningMapCache中选择出对应的Node上所有的未执行的Map任务集合,然后调用同上面一样的findTaskFromList()方法从TaskInProgress集合中选择一个适合在该TT上执行的Map任务,选择一个Map任务之后还是一样的步骤调用scheduleMap()方法将其添加到运行中的队列中。当循环结束之后,如果未选择出一个Map任务,则到下面判断如果level==maxCacheLevel,这里level是循环结束时的值,即level==maxLevelToSchedule,而maxLevelToSchedule==Math.min(maxCacheLevel, maxLevel),那么如果要使level==maxCacheLevel,则maxCacheLevel必须是小于等于maxLevel,从前面三个方法内部调用obtainNewMapTaskCommon()方法时传的maxCacheLevel参数值可以看出,obtainNewNodeLocalMapTask()传的值是1,obtainNewNodeOrRackLocalMapTask()传的值是maxLevel,而obtainNewMapTask传的值是anyCacheLevel(=maxLevel+1),所以这里满足level==maxCacheLevel条件的是obtainNewNodeLocalMapTask和obtainNewNodeOrRackLocalMapTask两个方法,即选择Node级别和TackNode级别的Map任务。而对于这两个任务是不需要进行下面两种方式选择Map任务的:推测执行任务和NoLocal任务,因为这两个方式选择的任务都不满足Node级别和TackNode级别,而是Any级别的,即也就只有obtainNewMapTask()这一个方法(其实还有一个方法obtainNewNonLocalMapTask(),传的maxCacheLevel参数值是NON_LOCAL_CACHE_LEVEL,即-1,这个方法会跳过注视中的1)方式)。下面继续看如何从根节点横向选择Map任务。

//2. Search breadth-wise across parents at max level for non-running
// TIP if
// - cache exists and there is a cache miss
// - node information for the tracker is missing (tracker's topology
// info not obtained yet) // collection of node at max level in the cache structure
Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); // get the node parent at max level
Node nodeParentAtMaxLevel =
(node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1); for (Node parent : nodesAtMaxLevel) { // skip the parent that has already been scanned
if (parent == nodeParentAtMaxLevel) {
continue;
} List<TaskInProgress> cache = nonRunningMapCache.get(parent);
if (cache != null) {
tip = findTaskFromList(cache, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running cache
scheduleMap(tip); // remove the cache if empty
if (cache.size() == 0) {
nonRunningMapCache.remove(parent);
}
LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
}
}
}

这一种方式直接从根节点集合中选择任务,JT中nodesAtMaxLevel集合保存着所有没有父节点的Node信息,即在集群中处于*等的Node,下面就是直接遍历nodesAtMaxLevel中所有的节点选择满足条件的Map任务。同上一步也是从nonRunningMapCache集合中选择出对应Node上所有的未运行的Map任务,该方法基本同上一步一样,只是选择的Node不同。上一种方式是从TT所在的Node开始,自下而上选择Map任务,而此处则直接选择最高等级的Node上的Map任务。显然这一步不考虑任何的Map任务本地化因素。下面再看如何选择No-Local任务。

// 3. Search non-local tips for a new task
tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
if (tip != null) {
// Add to the running list
scheduleMap(tip); LOG.info("Choosing a non-local task " + tip.getTIPId());
return tip.getIdWithinJob();
}

这一种方式是从nonLocalMaps中选择一个Map任务,nonLocalMaps保存的任务是那些在任务初始化时未找到输入文件所在的Location信息的任务,这些任务是无法放到nonRunningMapCache中的。

以上三种方式其实都是一种方式——对应注释上的1)——选择未运行的任务,只是这里分成三种不同的选择方式:1)从TT所在节点自下而上选择满足Node和RackNode本地化要求的任务,2)直接从所有最高等级的Node上选择任务,3)选择那些输入文件没有Location信息的No-Local任务。下面接着看注释中提到的第三种选择方式——选择推测执行任务。

9.JobInProgress.findNewMapTask():

这一中方式同上面一种方式一样,也分为三个不同的选择方式(同上)。当然,这一中方法发生的条件是hasSpeculativeMaps==true,即该Job拥有推测执行任务,或者说可以启用推测执行任务,该参数由mapred.map.tasks.speculative.execution参数值决定,默认是true。下面分别看看三种方式。

// 1. Check bottom up for speculative tasks from the running cache
if (node != null) {
Node key = node;
for (int level = 0; level < maxLevel; ++level) {
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
if (cacheForLevel != null) {
tip = findSpeculativeTask(cacheForLevel, tts,
avgProgress, currentTime, level == 0);
if (tip != null) {
if (cacheForLevel.size() == 0) {
runningMapCache.remove(key);
}
return tip.getIdWithinJob();
}
}
key = key.getParent();
}
}

第一种方式同上一种方式一样,依然是选择本地化Map任务(推测任务),不同的是这里是从runningMapCache中选择出Node上所有正在运行中的Task集合,从中选择一个Map任务对其启动推测执行任务。下面看看findSpeculativeTask()方法。

10.JobInProgress.findSpeculativeTask():

protected synchronized TaskInProgress findSpeculativeTask(
Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
double avgProgress, long currentTime, boolean shouldRemove) { Iterator<TaskInProgress> iter = list.iterator(); while (iter.hasNext()) {
TaskInProgress tip = iter.next();
// should never be true! (since we delete completed/failed tasks)
if (!tip.isRunning() || !tip.isRunnable()) {
iter.remove();
continue;
} if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
// Check if this tip can be removed from the list.
// If the list is shared then we should not remove.
if(shouldRemove){
iter.remove();
}
if (!tip.hasRunOnMachine(ttStatus.getHost(),
ttStatus.getTrackerName())) {
return tip;
}
} else {
if (shouldRemove && tip.hasRunOnMachine(ttStatus.getHost(),
ttStatus.getTrackerName())) {
iter.remove();
}
}
}
return null;
}

选择依据是!tip.isRunning() || !tip.isRunnable(),即该Task处于运行中,且未完成,才能对此启动推测执行任务。

这里简单介绍下推测执行任务,推测执行任务是Hadoop的一种容错机制,即如果一个Task运行的时间同其他同类的Task所需的时间长很多(且还未完成)时,则根据实际情况考虑启动一个同样的Task,这时集群中就有两个同样的任务同时运行,哪个先完成则提交哪个Task,而kill掉另外一个Task。推测执行任务虽然能够能够更好的保证一个Task在正常时间内完成,但是代价是需要消耗更多的资源。

下面是调用hasSpeculativeTask()方法判断该Task是否可以启动一个推测执行任务。

boolean hasSpeculativeTask(long currentTime, double averageProgress) {
//
// REMIND - mjc - these constants should be examined
// in more depth eventually...
// if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
(currentTime - startTime >= SPECULATIVE_LAG)
&& completes == 0 && !isOnlyCommitPending()) {
return true;
}
return false;
}

判断条件有点多,主要是:1)skipping==false,即未开启跳过模式;2)该Task正在运行的任务数是否大于MAX_TASK_EXECS(1),大于MAX_TASK_EXECS则表示已经有一个推测执行的任务在运行了;3)averageProgress(Map任务或者Reduce任务完成的进度)是否大于SPECULATIVE_GAP(20%);4)任务运行的时间是否已超过SPECULATIVE_LAG(60*1000);5)Task的完成数==0;6)Task是否处于等待提交状态。

最后调用TaskInProgress的hasRunOnMachine()方法判断该Task是否在该TT上有正在运行中的TaskAttempt,且在该TT上是否有失败过。到这里findSpeculativeTask()方法就完成了。该方法首先根据Task的运行状态判断是否满足推测执行的条件,然后根据Task的一系列属性判断是否开启推测执行,最后根据该Task在该TT是否有正在运行的TaskAttempt以及是否有过失败记录最终决定是否在该TT上运行该Task的推测执行任务。继续回到JobInProgress.findNewMapTask()

10.JobInProgress.findNewMapTask():

// 2. Check breadth-wise for speculative tasks

      for (Node parent : nodesAtMaxLevel) {
// ignore the parent which is already scanned
if (parent == nodeParentAtMaxLevel) {
continue;
} Set<TaskInProgress> cache = runningMapCache.get(parent);
if (cache != null) {
tip = findSpeculativeTask(cache, tts, avgProgress,
currentTime, false);
if (tip != null) {
// remove empty cache entries
if (cache.size() == 0) {
runningMapCache.remove(parent);
}
LOG.info("Choosing a non-local task " + tip.getTIPId()
+ " for speculation");
return tip.getIdWithinJob();
}
}
} // 3. Check non-local tips for speculation
tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress,
currentTime, false);
if (tip != null) {
LOG.info("Choosing a non-local task " + tip.getTIPId()
+ " for speculation");
return tip.getIdWithinJob();
}
}

这里的两种方式其实跟上面一样,无需过多的解释。

到这里findNewMapTask()就完成了,下面回到obtainNewMapTaskCommon()方法。

11.JobInProgress.obtainNewMapTaskCommon():

int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel,
status.mapProgress());
if (target == -1) {
return null;
} Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
// DO NOT reset for off-switch!
if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
resetSchedulingOpportunities();
}
}
return result;

当调用findNewMapTask()方法得到一个maps[]数组的索引之后,就可以从maps[]数组中获取对应的Map任务。这里调用了一个TaskInProgress的getTaskToRun()方法,为Task生成一个唯一的AttemptId,然后调用addRunningTask()方法创建一个Task对象,方法内部还是比较简单的,主要是new一个Task对象,并为其创建一个TaskStatus对象,以及初始化一些参数值。

如果Task!=null,则调用addRunningTaskToTIP()方法处理一些记录值,如记录Task的locality值,以及是否第一个TaskAttempt对象,等等。

到此obtainNewMapTaskCommon()方法就完成了,则obtainNewNodeLocalMapTask(),obtainNewNodeOrRackLocalMapTask(),obtainNewMapTask()三个方法也就都完成了。而obtainNewReduceTask()该方法基本和前面三个方法大同小异,也就不需要过多的解释,不同的只是Reduce任务不需要考虑本地性,选择相对更简单些。

以上就是一个Job如何选择一个Map/Reduce任务来执行的过程,总体上来看对于Map任务需要考虑Map任务的本地性,以提高执行效率。而任务的选择顺序依次是:失败的任务>未运行的任务>推测执行任务。而对于Map任务第二三种任务(未运行的任务>推测执行任务)又分成从TT所在的Node自下而上选择、从根节点横向选择、选择No-Local任务三种不同的方式。

OK,以上如有错误之处还望指出,谢谢!

上一篇:Linux 防火墙管理及操作


下一篇:java提高篇(三十)-----Iterator