前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳 理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。
作者
:Jaytalent
开始日期
:2013年9月9日参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成【2】Hadoop 1.0.0 源码
上一篇文章中,作业准备提交到JobTracker了。本文关注作业在提交到JobTracker后且在执行前经历了哪些事情。
【1】:
作业提交与初始化
一、作业提交与初始化
JobClient.submitJobInternal方法中,最后一步就是将作业提交到JobTracker:
status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());
这个方法所属的接口在 上一篇文章中有所提及,其实现有两个:JobTracker和LocalRunner。LocalRunner是用于执行本地作业的,当Hadoop配置为本地模式时采用该类处理作业。我们关注JobTracker.submitJob方法。这里多说一句,在JobClient对象初始化时有如下代码:
/**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
tasklogtimeout = conf.getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}
可以看到,tracker变量是从mapred.job.tracker配置获取值的,默认值为字符串“local”。因此,如果没有在配置文件配置这个值或者JobConf对象中没有添加配置文件(通常为mapred-site.xml)资源,jobSubmitClient就会使用LocalJobRunner进行初始化。
回到正题,
JobTracker.submitJob方法会做如下工作:
// Create the JobInProgress, do not lock the JobTracker since
// we are about to copy job.xml from HDFS
JobInProgress job = null;
try {
job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
} catch (Exception e) {
throw new IOException(e);
}
我们可以进入构造函数内部看看JobInProgress都有什么内容。
this.jobtracker = jobtracker;
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
JobStatus对象也比较关键,它维护了作业的一些状态信息如作业优先级,开始时间,map和reduce任务的进度等。
其他信息在后面分析JobTracker实现的时候再详述。
// check if queue is RUNNING
String queue = job.getProfile().getQueueName();
if (!queueManager.isRunning(queue)) {
throw new IOException("Queue \"" + queue + "\" is not running");
}
try {
aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
job.fail();
throw ioe;
}
c. 检查作业的内存使用量。用户在配置文件中可以配置Map任务和Reduce任务的内存使用量,而这些值不能超过管理员所配置的最大使用量,否则作业提交就会失败。
// Check the job if it cannot run in the cluster because of invalid memory
// requirements.
try {
checkMemoryRequirements(job);
} catch (IOException ioe) {
throw ioe;
}
d. 调用调度器模块,对作业进行初始化。具体过程如下
// Submit the job
JobStatus status;
status = addJob(jobId, job);
在addJob方法中,作业首先被加入到已经提交的作业列表中,然后通知JobTracker所有的监听器对象,当前作业被提交,并采取相应的行动。
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);
}
}
}
其中,任务调度器taskScheduler对象是在JobTracker构造时创建出来的。调度器对象和JobTracker对象是互相包含的关系。
// Create the scheduler
Class<? extends TaskScheduler> schedulerClass
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
可以看出,Hadoop任务调度器时可插拔的模块,调度器的类型通过配置文件获得,并使用反射机制实例化。用户可以通过继承TaskScheduler类实现自己的调度器(由于做研究需要,本人实现了一个简单的调度器,实现过程日后分享)。Hadoop默认的调度器为JobQueueTaskScheduler,调度策略为先进先出(FIFO)。
public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}
其中taskTrackerManager对象就是JobTracker,通过addJobInProgressListener方法注册监听器。这样,当有JobTracker发现有作业被提交、更新或删除时,就会通知订阅者TaskScheduler,并调用相应的回调函数,如上面提到的listener.jobAdded方法。更多调度器的细节请关注后续文章。
// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}
TaskInProgrees维护任务运行时信息,与JobInProgress类似。
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}
用户可以在配置文件中指定Reduce任务的个数。
// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();
- 作业初始化后会占用内存资源,如果有大量初始化作业在JobTracker等待调度就会占用不必要的资源。在交给调度器后,Hadoop按照一定策略选择性地初始化以节省内存资源。
- 只有经过初始化的作业才能得到调度,因此将初始化工作嵌入调度器中比较合理。