Fair Scheduler中的Delay Schedule分析

  延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。

  延迟调度的大体思想如下:

  若该job找到一个node-local的MapTask,则返回该task;若找不到,则延迟调度。即在nodeLocalityDelay时长内,重新找到一个node-local的MapTask并返回;

  否则等待时长超过nodeLocalityDelay之后,寻找一个rack-local的MapTask并返回;若找不到,则延迟调度。即在rackLocalityDelay时长内,重新找到一个rack-local的MapTask并返回;

  否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。

  FairScheduler.java中关于延迟调度的主要变量:

 long nodeLocalityDelay://node-local已经等待的时间
long rackLocalityDelay: //rack-local已经等待的时间
boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过)
timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间
LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别
nodeLocalityDelay = rackLocalityDelay =
Math.min(15000 , (long) (1.5 * jobTracker.getNextHeartbeatInterval()));

  

  在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):

 /**
* Get the maximum locality level at which a given job is allowed to
* launch tasks, based on how long it has been waiting for local tasks.
* This is used to implement the "delay scheduling" feature of the Fair
* Scheduler for optimizing data locality.
* If the job has no locality information (e.g. it does not use HDFS), this
* method returns LocalityLevel.ANY, allowing tasks at any level.
* Otherwise, the job can only launch tasks at its current locality level
* or lower, unless it has waited at least nodeLocalityDelay or
* rackLocalityDelay milliseconds depends on the current level. If it
* has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
* it can go to any level.
*/
protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
long currentTime) {
JobInfo info = infos.get(job);
if (info == null) { // Job not in infos (shouldn't happen)
LOG.error("getAllowedLocalityLevel called on job " + job
+ ", which does not have a JobInfo in infos");
return LocalityLevel.ANY;
}
if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
return LocalityLevel.ANY;
}
// Don't wait for locality if the job's pool is starving for maps
Pool pool = poolMgr.getPool(job);
PoolSchedulable sched = pool.getMapSchedulable();
long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
eventLog.log("INFO", "No delay scheduling for "
+ job.getJobID() + " because it is being starved");
return LocalityLevel.ANY;
}
// In the common case, compute locality level based on time waited
switch(info.lastMapLocalityLevel) {
case NODE: // Last task launched was node-local
if (info.timeWaitedForLocalMap >=
nodeLocalityDelay + rackLocalityDelay)
return LocalityLevel.ANY;
else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
return LocalityLevel.RACK;
else
return LocalityLevel.NODE;
case RACK: // Last task launched was rack-local
if (info.timeWaitedForLocalMap >= rackLocalityDelay)
return LocalityLevel.ANY;
else
return LocalityLevel.RACK;
default: // Last task was non-local; can launch anywhere
return LocalityLevel.ANY;
}
}

getAllowedLocalityLevel()

1. 若lastMapLocalityLevel为Node:

1)若timeWaitedForLocalMap >= nodeLocalityDelay + rackLocalityDelay,则可以调度off-switch及以下级别的MapTask;

2)若timeWaitedForLocalMap >= nodeLocalityDelay,则可以调度rack-local及以下级别的MapTask;

3)否则调度node-local级别的MapTask。

2. 若lastMapLocalityLevel为Rack:

1)若timeWaitedForLocalMap >= rackLocalityDelay,则调度off-switch及以下级别的MapTask;

2)否则调度rack-local及以下级别的MapTask;

3. 否则调度off-switch及以下级别的MapTask;

  延迟调度的具体工作流程如下(具体工作在FairScheduler.java的assignTasks()方法中完成):

 @Override
public synchronized List<Task> assignTasks(TaskTracker tracker)
throws IOException {
if (!initialized) // Don't try to assign tasks if we haven't yet started up
return null;
String trackerName = tracker.getTrackerName();
eventLog.log("HEARTBEAT", trackerName);
long currentTime = clock.getTime(); // Compute total runnable maps and reduces, and currently running ones
int runnableMaps = 0;
int runningMaps = 0;
int runnableReduces = 0;
int runningReduces = 0;
for (Pool pool: poolMgr.getPools()) {
runnableMaps += pool.getMapSchedulable().getDemand();
runningMaps += pool.getMapSchedulable().getRunningTasks();
runnableReduces += pool.getReduceSchedulable().getDemand();
runningReduces += pool.getReduceSchedulable().getRunningTasks();
} ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
// Compute total map/reduce slots
// In the future we can precompute this if the Scheduler becomes a
// listener of tracker join/leave events.
int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus); eventLog.log("RUNNABLE_TASKS",
runnableMaps, runningMaps, runnableReduces, runningReduces); // Update time waited for local maps for jobs skipped on last heartbeat
//备注一
updateLocalityWaitTimes(currentTime); // Check for JT safe-mode
if (taskTrackerManager.isInSafeMode()) {
LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
return null;
} TaskTrackerStatus tts = tracker.getStatus(); int mapsAssigned = 0; // loop counter for map in the below while loop
int reducesAssigned = 0; // loop counter for reduce in the below while
int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
boolean mapRejected = false; // flag used for ending the loop
boolean reduceRejected = false; // flag used for ending the loop // Keep track of which jobs were visited for map tasks and which had tasks
// launched, so that we can later mark skipped jobs for delay scheduling
Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
Set<JobInProgress> launchedMap = new HashSet<JobInProgress>(); ArrayList<Task> tasks = new ArrayList<Task>();
// Scan jobs to assign tasks until neither maps nor reduces can be assigned
//备注二
while (true) {
// Computing the ending conditions for the loop
// Reject a task type if one of the following condition happens
// 1. number of assigned task reaches per heatbeat limit
// 2. number of running tasks reaches runnable tasks
// 3. task is rejected by the LoadManager.canAssign
if (!mapRejected) {
if (mapsAssigned == mapCapacity ||
runningMaps == runnableMaps ||
!loadMgr.canAssignMap(tts, runnableMaps,
totalMapSlots, mapsAssigned)) {
eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
mapRejected = true;
}
}
if (!reduceRejected) {
if (reducesAssigned == reduceCapacity ||
runningReduces == runnableReduces ||
!loadMgr.canAssignReduce(tts, runnableReduces,
totalReduceSlots, reducesAssigned)) {
eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
reduceRejected = true;
}
}
// Exit while (true) loop if
// 1. neither maps nor reduces can be assigned
// 2. assignMultiple is off and we already assigned one task
if (mapRejected && reduceRejected ||
!assignMultiple && tasks.size() > 0) {
break; // This is the only exit of the while (true) loop
} // Determine which task type to assign this time
// First try choosing a task type which is not rejected
TaskType taskType;
if (mapRejected) {
taskType = TaskType.REDUCE;
} else if (reduceRejected) {
taskType = TaskType.MAP;
} else {
// If both types are available, choose the task type with fewer running
// tasks on the task tracker to prevent that task type from starving
if (tts.countMapTasks() + mapsAssigned <=
tts.countReduceTasks() + reducesAssigned) {
taskType = TaskType.MAP;
} else {
taskType = TaskType.REDUCE;
}
} // Get the map or reduce schedulables and sort them by fair sharing
List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
//对job进行排序
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean foundTask = false;
//备注三
for (Schedulable sched: scheds) { // This loop will assign only one task
eventLog.log("INFO", "Checking for " + taskType +
" task in " + sched.getName());
//备注四
Task task = taskType == TaskType.MAP ?
sched.assignTask(tts, currentTime, visitedForMap) :
sched.assignTask(tts, currentTime, visitedForReduce);
if (task != null) {
foundTask = true;
JobInProgress job = taskTrackerManager.getJob(task.getJobID());
eventLog.log("ASSIGN", trackerName, taskType,
job.getJobID(), task.getTaskID());
// Update running task counts, and the job's locality level
if (taskType == TaskType.MAP) {
launchedMap.add(job);
mapsAssigned++;
runningMaps++;
//备注五
updateLastMapLocalityLevel(job, task, tts);
} else {
reducesAssigned++;
runningReduces++;
}
// Add task to the list of assignments
tasks.add(task);
break; // This break makes this loop assign only one task
} // end if(task != null)
} // end for(Schedulable sched: scheds) // Reject the task type if we cannot find a task
if (!foundTask) {
if (taskType == TaskType.MAP) {
mapRejected = true;
} else {
reduceRejected = true;
}
}
} // end while (true) // Mark any jobs that were visited for map tasks but did not launch a task
// as skipped on this heartbeat
for (JobInProgress job: visitedForMap) {
if (!launchedMap.contains(job)) {
infos.get(job).skippedAtLastHeartbeat = true;
}
} // If no tasks were found, return null
return tasks.isEmpty() ? null : tasks;
}

assignTasks()

  备注一:updateLocalityWaitTimes()。首先更新自上次心跳以来,timeWaitedForLocalMap的时间,并将所有job 的skippedAtLastHeartbeat设为false;代码如下:

 /**
* Update locality wait times for jobs that were skipped at last heartbeat.
*/
private void updateLocalityWaitTimes(long currentTime) {
long timeSinceLastHeartbeat =
(lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
lastHeartbeatTime = currentTime;
for (JobInfo info: infos.values()) {
if (info.skippedAtLastHeartbeat) {
info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
info.skippedAtLastHeartbeat = false;
}
}
}

updateLocalityWaitTimes()

  备注二:在while(true)循环中不断分配MapTask和ReduceTask,直到没有可被分配的为止;在循环中对所有job进行排序;接着在一个for()循环中进行真正的MapTask分配(Schedulable有两个子类,分别代表PoolSchedulable与JobSchedulable。这里的Schedulable可当做job看待)。

  备注三、四:在for()循环里,JobSchedulable中的assignTask()方法会被调用,来选择适当的MapTask或者ReduceTask。在选择MapTask时,先会调用FairScheduler.getAllowedLocalityLevel()方法来确定应该调度哪个级别的MapTask(具体的方法分析见上),然后根据该方法的返回值来选择对应级别的MapTask。assignTask()方法代码如下:

 @Override
public Task assignTask(TaskTrackerStatus tts, long currentTime,
Collection<JobInProgress> visited) throws IOException {
if (isRunnable()) {
visited.add(job);
TaskTrackerManager ttm = scheduler.taskTrackerManager;
ClusterStatus clusterStatus = ttm.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers(); // check with the load manager whether it is safe to
// launch this task on this taskTracker.
LoadManager loadMgr = scheduler.getLoadManager();
if (!loadMgr.canLaunchTask(tts, job, taskType)) {
return null;
}
if (taskType == TaskType.MAP) {
//确定应该调度的级别
LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
job, currentTime);
scheduler.getEventLog().log(
"ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
switch (localityLevel) {
case NODE:
return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
ttm.getNumberOfUniqueHosts());
case RACK:
return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
ttm.getNumberOfUniqueHosts());
default:
return job.obtainNewMapTask(tts, numTaskTrackers,
ttm.getNumberOfUniqueHosts());
}
} else {
return job.obtainNewReduceTask(tts, numTaskTrackers,
ttm.getNumberOfUniqueHosts());
}
} else {
return null;
}
}

assignTask()

  可以看到,在该方法中又会根据相应的级别调用JobInProgress类中的方法来获取该级别的MapTask。

  备注五:最后updateLastMapLocalityLevel()方法会更新该job的一些信息:lastMapLocalityLevel设为该job对应的级别;timeWaitedForLocalMap置为0。

   /**
* Update a job's locality level and locality wait variables given that that
* it has just launched a map task on a given task tracker.
*/
private void updateLastMapLocalityLevel(JobInProgress job,
Task mapTaskLaunched, TaskTrackerStatus tracker) {
JobInfo info = infos.get(job);
boolean isNodeGroupAware = conf.getBoolean(
"net.topology.nodegroup.aware", false);
LocalityLevel localityLevel = LocalityLevel.fromTask(
job, mapTaskLaunched, tracker, isNodeGroupAware);
info.lastMapLocalityLevel = localityLevel;
info.timeWaitedForLocalMap = 0;
eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
}

updateLastMapLocalityLevel()

  本文基于hadoop1.2.1。如有错误,还请指正

  参考文章: 《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

    https://issues.apache.org/jira/secure/attachment/12457515/fair_scheduler_design_doc.pdf

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4568270.html

上一篇:bzoj 3202 [Sdoi2013]项链——容斥+置换+推式子


下一篇:[MySQL数据库之Navicat.pymysql模块、视图、触发器、存储过程、函数、流程控制]