前言:
最近一直在分析hadoop的运行流程,我们查阅了大量的资料,虽然从感性上对这个流程有了一个认识但是我总是感觉对mapreduce的运行还是没有一个全面的认识,所以决定从源代码级别对mapreduce的运行流程做一个分析。
前奏:
首先从任务提交开始,如果我们使用的是job类的话那么提交任务的触发语句是
job.waitForCompletion(true),true表示运行时打印运行的信息;
在 eclipse中我们按F3键可以发现这个方法的代码,这个方法实际是调用了job类的submit方法,而submit方法又调用 submitJobInternal(conf)的方法提交任务,然后这个方法会将job的job.jar,job.split,job.xml三个文件 上传倒hdfs文件系统,
ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException{ JobConf jobCopy = job; Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); //得到job的ID; JobID jobId = jobSubmitClient.getNewJobId(); //上传Job的路径 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); copyAndConfigureFiles(jobCopy, submitJobDir); //获得namenode的任务代理 // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(), new Path [] {submitJobDir}, jobCopy); //得到job配置路径; Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); //设置job的reduce的任务数目默认为1 int reduces = jobCopy.getNumReduceTasks(); //得到本地的IP InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContext(jobCopy, jobId); jobCopy = (JobConf)context.getConfiguration(); // Check the output specification if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } //创建文件的split FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString()); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { //写入xml文件 jobCopy.writeXml(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // printTokens(jobId, jobCopy.getCredentials()); //使用代理机制提交作业 status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials()); if (status != null) { return new NetworkedJob(status); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } } } });
查看这个方法我们发现在这里我们可以得到一个job的路径和job的id。然后使用代理机制提交作业。
任务提交完之后jobtracker会产生一个JobInProgress类的实例,这个类表示了job的各种信息,以及job所需要执行的各种动作。JobTracker收倒提交的数据之后就会根据job的配置tasktracker分配任务,当所有的tasktracker执行完之后才会通知jobclient任务完成。Jobtracker会将job加到一个队列中去这个队列叫jobInitQuene,然后在JobTracker有一个名为JobQueueTaskSchedule的对象,会轮询队列的每一个对象,一旦有新的job加入就将其取出,然后将其初始化。对于每个task还会有一个TaskInprogress对象与其对应。TaskTracker启动之后和JobTracker的通信机制是通过心跳机制。
TaskTracker 每个三秒钟向JobTracker发送一个JobTracker发送一个HearBeat,HeartBeat中会有很多Taskracker的信息。 JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果 TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在 Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的 Heartbeat的回复消息称为HeartbeatResponse。
在 TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内 部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任 务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。
不 论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的 文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。 等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。
等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。
代码详解:
当 我们在hadoop中bin/start-all.sh之后我们查看脚本会发现,它启动了三个脚本hadoop-config.sh,start- dfs.sh ,start-mapred.sh。Hadoop会根据一系列的配置启动JobTracker和TaskTracker。Master会根据SSH登录登 录到slaves机器上启动tasktracker和datanode。
下面结合hadoop的源代码进行流程分析
先介绍JobTracker和TaskTracker
在 每一个JobTracker对应一个org.apache.hadoop.mapred.JobTracker类,这个类主要负责任务的接受,调度以及对 TaskTracker的监控,每个JobTracker类是作为一个单独的JVM来使用的。通过方法startTracker()方法启动一个 JobTracker。源代码:
/** * Start the JobTracker with given configuration. * * The conf will be modified to reflect the actual ports on which * the JobTracker is up and running if the user passes the port as * <code>zero</code>. * * @param conf configuration for the JobTracker. * @throws IOException */ public static JobTracker startTracker(JobConf conf, String identifier) throws IOException, InterruptedException { DefaultMetricsSystem.initialize("JobTracker"); JobTracker result = null; while (true) { try { //初始化 JobTracker名为result result = new JobTracker(conf, identifier); result.taskScheduler.setTaskTrackerManager(result); break; } catch (VersionMismatch e) { throw e; } catch (BindException e) { throw e; } catch (UnknownHostException e) { throw e; } catch (AccessControlException ace) { // in case of jobtracker not having right access // bail out throw ace; } catch (IOException e) { LOG.warn("Error starting tracker: " + StringUtils.stringifyException(e)); } Thread.sleep(1000); } if (result != null) { JobEndNotifier.startNotifier(); MBeans.register("JobTracker", "JobTrackerInfo", result); } return result; }
startTracker根据conf配置创建JobTracker对象,然后进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
还有一个重要的方法就是offerService方法
/** * Run forever */ public void offerService() throws InterruptedException, IOException { // Prepare for recovery. This is done irrespective of the status of restart // flag. while (true) { try { recoveryManager.updateRestartCount(); break; } catch (IOException ioe) { LOG.warn("Failed to initialize recovery manager. ", ioe); // wait for some time Thread.sleep(FS_ACCESS_RETRY_PERIOD); LOG.warn("Retrying..."); } } taskScheduler.start(); // Start the recovery after starting the scheduler try { //恢复 recoveryManager.recover(); } catch (Throwable t) { LOG.warn("Recovery manager crashed! Ignoring.", t); } // refresh the node list as the recovery manager might have added // disallowed trackers refreshHosts(); this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); this.expireTrackersThread.start(); this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); this.retireJobsThread.start(); expireLaunchingTaskThread.start(); if (completedJobStatusStore.isActive()) { completedJobsStoreThread = new Thread(completedJobStatusStore, "completedjobsStore-housekeeper"); completedJobsStoreThread.start(); } // start the inter-tracker server once the jt is ready this.interTrackerServer.start(); synchronized (this) { state = State.RUNNING; } LOG.info("Starting RUNNING"); this.interTrackerServer.join(); LOG.info("Stopped interTrackerServer"); }
这个方法会一直执行,这里虽然是个死循环但是它能不断的恢复,启动任务,通过这种方式首先调度。
该 方法调用了taskSchedule对象的start()方法。该对象是JobTracker的数据成员,类型提供了一些列接口,使得 JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类为 JobQueueTaskSchedule类。所以,taskSchedule.start()方法实际执行的是JobQueueSchedule的 start方法;
/** * Lifecycle method to allow the scheduler to start any work in separate * threads. * @throws IOException */ public void start() throws IOException { // do nothing } //请看JobQueueSchedule的start方法; public synchronized void start() throws IOException { super.start();//什么都不做 //添加JobInProgressListener监听器 taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); //添加任务初始化监听器 eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); eagerTaskInitializationListener.start();//启动 taskTrackerManager.addJobInProgressListener( eagerTaskInitializationListener); }
JobQueueTaskScheduler 类的start方法主要注册了两个非常重要的监听器jobQueueJobInProgressListener和 eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监 听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方 法,对job进行初始化。
在看一下JobInProgress类的initTasks方法
/** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ public synchronized void initTasks() throws IOException, KillInterruptedException { if (tasksInited || isComplete()) { return; } synchronized(jobInitKillStatus){ if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) { return;//已经启动或者任务被kill直接结束方法; } jobInitKillStatus.initStarted = true; } LOG.info("Initializing " + jobId); final long startTimeFinal = this.startTime; // log job info as the user running the job try { 在这里执行job userUGI.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, startTimeFinal, hasRestarted()); return null; } }); } catch(InterruptedException ie) { throw new IOException(ie); } // log the job priority setPriority(this.priority); // // generate security keys needed by Tasks // generateAndStoreTokens(); // // read input splits and create a map per a split // //读取每个spit TaskSplitMetaInfo[] splits = createSplits(jobId); if (numMapTasks != splits.length) { throw new IOException("Number of maps in JobConf doesn't match number of " + "recieved splits for job " + jobId + "! " + "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); } numMapTasks = splits.length; //map和reduce任务等待直到得到slots才开始执行; jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); jobtracker.getInstrumentation().addWaitingReduces(getJobID(),numReduceTasks;
下面通过初始化map任务
maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getInputDataLength(); //初始化map任务; maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap); } LOG.info("Input size for job " + jobId + " = " + inputLength + ". Number of splits = " + splits.length); // Set localityWaitFactor before creating cache localityWaitFactor = conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR); if (numMapTasks > 0) { //创建 nonRunningMapCache = createCache(splits, maxLevel); } // set the launch time this.launchTime = jobtracker.getClock().getTime();
在这里创建reduce任务;
// // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce); nonRunningReduces.add(reduces[i]); }
在这里计算启动reduce任务时最小的map任务数目
completedMapsForReduceSlowstart = (int)Math.ceil( (conf.getFloat("mapred.reduce.slowstart.completed.maps", DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks));
估计所有的map输出的数目
resourceEstimator.setThreshhold(completedMapsForReduceSlowstart); // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty
Split的信息.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask(); // create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); synchronized(jobInitKillStatus){ jobInitKillStatus.initDone = true; if(jobInitKillStatus.killed) { throw new KillInterruptedException("Job " + jobId + " killed in init"); } } tasksInited = true; JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks); // Log the number of map and reduce tasks LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks + " map tasks and " + numReduceTasks + " reduce tasks."); } 代码中我们发现maptask和reducetTask都是TaskInProgress的实例,并且Task的数目与map的数目保持一致。
接下来要创建datanode的tasktracker
在hadoop中每一个tasktracker对应一个org.apache.hadoop.mapred.TaskTracker 类,这个类实现了tasktracker的各种功能。每一个TaskTracker也是组为一个单独的JVM来使用的。在hadoop脚本中对应 bin/hadoop-daemon.sh start jobtracker
先来看一下TaskTracker的主函数:
/** * Start the TaskTracker, point toward the indicated JobTracker */ public static void main(String argv[]) throws Exception { StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG); if (argv.length != 0) { System.out.println("usage: TaskTracker"); System.exit(-1); } try { JobConf conf=new JobConf(); // enable the server to track time spent waiting on locks //使用代理机制让服务器等待判断 ReflectionUtils.setContentionTracing (conf.getBoolean("tasktracker.contention.tracking", false)); DefaultMetricsSystem.initialize("TaskTracker"); TaskTracker tt = new TaskTracker(conf); MBeans.register("TaskTracker", "TaskTrackerInfo", tt); //做重要的语句启动 TaskTracker tt.run(); } catch (Throwable e) { LOG.error("Can not start task tracker because "+ StringUtils.stringifyException(e)); System.exit(-1); } }
在run方法中会启动方法offerService()方法:
public void run() { try { getUserLogManager().start(); startCleanupThreads(); boolean denied = false; while (running && !shuttingDown && !denied) { boolean staleState = false; try { // This while-loop attempts reconnects if we get network errors while (running && !staleState && !shuttingDown && !denied) { try { State osState = offerService(); if (osState == State.STALE) { staleState = true; } else if (osState == State.DENIED) { denied = true; } } catch (Exception ex) { if (!shuttingDown) { LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...", ex); try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } } } finally { close(); } if (shuttingDown) { return; } LOG.warn("Reinitializing local state"); initialize(); } if (denied) { shutdown(); } } catch (IOException iex) { LOG.error("Got fatal exception while reinitializing TaskTracker: " + StringUtils.stringifyException(iex)); return; } catch (InterruptedException i) { LOG.error("Got interrupted while reinitializing TaskTracker: " + i.getMessage()); return; } }
再来看offerService()方法:
State offerService() throws Exception { //得到上次发送心跳的时间; long lastHeartbeat = System.currentTimeMillis(); while (running && !shuttingDown) { try { long now = System.currentTimeMillis(); // accelerate to account for multiple finished tasks up-front synchronized (finishedCount) { long remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; while (remaining > 0) { // sleeps for the wait time or // until there are *enough* empty slots to schedule tasks finishedCount.wait(remaining); + // Recompute now = System.currentTimeMillis(); remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; } // Reset count finishedCount.set(0); } // If the TaskTracker is just starting up: // 1. Verify the buildVersion // 2. Get the system directory & filesystem if(justInited) { String jobTrackerBV = jobClient.getBuildVersion(); if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) { String msg = "Shutting down. Incompatible buildVersion." + "\nJobTracker's: " + jobTrackerBV + "\nTaskTracker's: "+ VersionInfo.getBuildVersion(); LOG.error(msg); try { jobClient.reportTaskTrackerError(taskTrackerName, null, msg); } catch(Exception e ) { LOG.info("Problem reporting to jobtracker: " + e); } return State.DENIED; } String dir = jobClient.getSystemDir(); if (dir == null) { throw new IOException("Failed to get system directory"); } systemDirectory = new Path(dir); systemFS = systemDirectory.getFileSystem(fConf); } // Send the heartbeat and process the jobtracker's directives HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); // Note the time when the heartbeat returned, use this to decide when to send the // next heartbeat lastHeartbeat = System.currentTimeMillis(); // Check if the map-event list needs purging Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); if (jobs.size() > 0) { synchronized (this) { // purge the local map events list for (JobID job : jobs) { RunningJob rjob; synchronized (runningJobs) { rjob = runningJobs.get(job); if (rjob != null) { synchronized (rjob) { FetchStatus f = rjob.getFetchStatus(); if (f != null) { f.reset(); } } } } } // Mark the reducers in shuffle for rollback synchronized (shouldReset) { for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks.entrySet()) { if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) { this.shouldReset.add(entry.getKey()); } } } } }
在返回的心跳heartbeatResponse中有很多jobTracker的指令
TaskTrackerAction[] actions = heartbeatResponse.getActions(); if(LOG.isDebugEnabled()) { LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + heartbeatResponse.getResponseId() + " and " + ((actions != null) ? actions.length : 0) + " actions"); } if (reinitTaskTracker(actions)) { return State.STALE; } // resetting heartbeat interval from the response. heartbeatInterval = heartbeatResponse.getHeartbeatInterval(); justStarted = false; justInited = false;
下面执行action
if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { 添加进队列 addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); } } else { tasksToCleanup.put(action); } } } markUnresponsiveTasks(); killOverflowingTasks(); //we've cleaned up, resume normal operation if (!acceptNewTasks && isIdle()) { acceptNewTasks=true; } //The check below may not be required every iteration but we are //erring on the side of caution here. We have seen many cases where //the call to jetty's getLocalPort() returns different values at //different times. Being a real paranoid here. checkJettyPort(server.getPort()); } catch (InterruptedException ie) { LOG.info("Interrupted. Closing down."); return State.INTERRUPTED; } catch (DiskErrorException de) { String msg = "Exiting task tracker for disk error:\n" + StringUtils.stringifyException(de); LOG.error(msg); synchronized (this) { 报告错误 jobClient.reportTaskTrackerError(taskTrackerName, "DiskErrorException", msg); } return State.STALE; } catch (RemoteException re) { String reClass = re.getClassName(); if (DisallowedTaskTrackerException.class.getName().equals(reClass)) { LOG.info("Tasktracker disallowed by JobTracker."); return State.DENIED; } } catch (Exception except) { String msg = "Caught exception: " + StringUtils.stringifyException(except); LOG.error(msg); } } return State.NORMAL; }
TaskTracker每个3秒钟会向JobTracker发送一个心跳。心跳机制采用了RPC代理机制实现通信。关于代理机制其实就是一个远程方法调用的机制,这个大家可以参考其他的资料。
接下来JobTracker接受心跳并向TaskTracker分配任务,当JobTracker接受到心跳的时候会调用方法:heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)返回值是一个HeartbeatResponse对象:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Got heartbeat from: " + status.getTrackerName() + " (restarted: " + restarted + " initialContact: " + initialContact + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: " + responseId); } // Make sure heartbeat is from a tasktracker allowed by the jobtracker. if (!acceptTaskTracker(status)) { throw new DisallowedTaskTrackerException(status); } // First check if the last heartbeat response got through String trackerName = status.getTrackerName(); long now = clock.getTime(); if (restarted) { faultyTrackers.markTrackerHealthy(status.getHost()); } else { faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now); } HeartbeatResponse prevHeartbeatResponse = trackerToHeartbeatResponseMap.get(trackerName); boolean addRestartInfo = false; if (initialContact != true) { // If this isn't the 'initial contact' from the tasktracker, // there is something seriously wrong if the JobTracker has // no record of the 'previous heartbeat'; if so, ask the // tasktracker to re-initialize itself. if (prevHeartbeatResponse == null) { // This is the first heartbeat from the old tracker to the newly // started JobTracker if (hasRestarted()) { addRestartInfo = true; // inform the recovery manager about this tracker joining back recoveryManager.unMarkTracker(trackerName); } else { // Jobtracker might have restarted but no recovery is needed // otherwise this code should not be reached LOG.warn("Serious problem, cannot find record of 'previous' " + "heartbeat for '" + trackerName + "'; reinitializing the tasktracker"); return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } } else { // It is completely safe to not process a 'duplicate' heartbeat from a // {@link TaskTracker} since it resends the heartbeat when rpcs are // lost see {@link TaskTracker.transmitHeartbeat()}; // acknowledge it by re-sending the previous response to let the // {@link TaskTracker} go forward. if (prevHeartbeatResponse.getResponseId() != responseId) { LOG.info("Ignoring 'duplicate' heartbeat from '" + trackerName + "'; resending the previous 'lost' response"); return prevHeartbeatResponse; } } } // Process this heartbeat short newResponseId = (short)(responseId + 1); status.setLastSeen(now); if (!processHeartbeat(status, initialContact, now)) { if (prevHeartbeatResponse != null) { trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } // Initialize the response to be sent for the heartbeat HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost()); // Check for new tasks to be executed on the tasktracker if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } actions.add(new LaunchTaskAction(task)); } } } } // kill的任务的队列 List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); } // Check for jobs to be killed/cleanedup List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); } // Check for tasks whose outputs can be saved List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); } // calculate next heartbeat interval and put in heartbeat response int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); response.setActions(actions.toArray(new TaskTrackerAction[actions.size()])); // check if the restart info is req if (addRestartInfo) { response.setRecoveredJobs(recoveryManager.getJobsToRecover()); } // Update the trackerToHeartbeatResponseMap trackerToHeartbeatResponseMap.put(trackerName, response); // Done processing the hearbeat, now remove 'marked' tasks removeMarkedTasks(trackerName); return response; }
之后启用调度器,默认的调度器为JobQueueTaskScheduler它的assignTasks方法如下:
public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); //得到云端的状态; ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // //得到TaskTracker的状态 final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // 分配任务 List<Task> assignedTasks = new ArrayList<Task>(); // // Compute (running + pending) map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } } if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks; }
当
JobTracker接受到heartbeat后,如果JobTracker返回的response含有分配好的任务
LaunchAction,TaskTracker则addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者
ReduceLauncher对象的taskToLauncher的队列中。MapLauncher和ReduceLauncher对象均为
TaskLauncher类的实例,它是TaskTracker类的一个内部类,具有一个数据成员,是
TaskTracker.TaskInProgress类型的队列,如果应答包中包含的任务是map
task则放入mapLauncher队列,如果是reduce task则放入reduceLauncher的taskToLaunch队列:
private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } }
注册动作
private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) { Task t = action.getTask(); LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + " task's state:" + t.getState()); TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); synchronized (this) { tasks.put(t.getTaskID(), tip); runningTasks.put(t.getTaskID(), tip); boolean isMap = t.isMapTask(); if (isMap) { mapTotal++; } else { reduceTotal++; } } return tip; } private Path localizeJobConfFile(Path jobFile, String user, FileSystem userFs, JobID jobId) throws IOException { // Get sizes of JobFile and JarFile // sizes are -1 if they are not present. FileStatus status = null; long jobFileSize = -1; try { status = userFs.getFileStatus(jobFile); jobFileSize = status.getLen(); } catch(FileNotFoundException fe) { jobFileSize = -1; } Path localJobFile = lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user, jobId.toString()), jobFileSize, fConf); // Download job.xml userFs.copyToLocalFile(jobFile, localJobFile); return localJobFile; }
这个方法进行了一些列的本地化的处理,将jar包,split文件以及xml文件拷贝到本地。当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:
/** * Kick off the task execution */ public synchronized void launchTask(RunningJob rjob) throws IOException { if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED || this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { localizeTask(task); if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { this.taskStatus.setRunState(TaskStatus.State.RUNNING); } setTaskRunner(task.createRunner(TaskTracker.this, this, rjob)); this.runner.start(); long now = System.currentTimeMillis(); this.taskStatus.setStartTime(now); this.lastProgressReport = now; } else { LOG.info("Not launching task: " + task.getTaskID() + " since it's state is " + this.taskStatus.getRunState()); } } 之后就创建TaskRunner对象,运行任务。 public final void run() { String errorInfo = "Child Error"; try { //before preparing the job localize //all the archives TaskAttemptID taskid = t.getTaskID(); final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); //simply get the location of the workDir and pass it to the child. The //child will do the actual dir creation final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString()); String user = tip.getUGI().getUserName(); if (!prepare()) { return; } // Accumulates class paths for child. List<String> classPaths = getClassPaths(conf, workDir, taskDistributedCacheManager); long logSize = TaskLog.getTaskLogLength(conf); // Build exec child JVM args. Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize); tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf); // set memory limit using ulimit if feasible and necessary ... String setup = getVMSetupCmd(); // Set up the redirection of the task's stdout and stderr streams File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask()); File stdout = logFiles[0]; File stderr = logFiles[1]; tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); Map<String, String> env = new HashMap<String, String>(); errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid, logSize); // flatten the env as a set of export commands List <String> setupCmds = new ArrayList<String>(); for(Entry<String, String> entry : env.entrySet()) { StringBuffer sb = new StringBuffer(); sb.append("export "); sb.append(entry.getKey()); sb.append("=\""); sb.append(entry.getValue()); sb.append("\""); setupCmds.add(sb.toString()); } setupCmds.add(setup); launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID()); if (exitCodeSet) { if (!killed && exitCode != 0) { if (exitCode == 65) { tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID()); } throw new IOException("Task process exit with nonzero status of " + exitCode + "."); } } } catch (FSError e) { LOG.fatal("FSError", e); try { tracker.fsErrorInternal(t.getTaskID(), e.getMessage()); } catch (IOException ie) { LOG.fatal(t.getTaskID()+" reporting FSError", ie); } } catch (Throwable throwable) { LOG.warn(t.getTaskID() + " : " + errorInfo, throwable); Throwable causeThrowable = new Throwable(errorInfo, throwable); ByteArrayOutputStream baos = new ByteArrayOutputStream(); causeThrowable.printStackTrace(new PrintStream(baos)); try { tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString()); } catch (IOException e) { LOG.warn(t.getTaskID()+" Reporting Diagnostics", e); } } finally { // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with // *false* since the task has either // a) SUCCEEDED - which means commit has been done // b) FAILED - which means we do not need to commit tip.reportTaskFinished(false); }