这部分的计划是这样的,首先解释JobTracker的启动过程和作业从JobClient提交到JobTracker上;然后分析TaskTracker和heartbeat;最后将整个流程debug一遍来加深映象。
在看JobTracker源代码的时候就会发现,它里边有main()方法,这就说明了它是一个独立的java进程。在hadoop根目录下的bin文件夹中的hadoop脚本中可以看到,它指定了JobTracker类。如下图所示:
JobTracker的main()方法中最主要的是以下两条语句:
public static void main(String argv[]
) throws IOException, InterruptedException {
StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); try {
if(argv.length == 0) {
JobTracker tracker = startTracker(new JobConf());//用来生成JobTracker对象
tracker.offerService();//初始化JobTracker,并启动作业调度器
}
else {
if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
dumpConfiguration(new PrintWriter(System.out));
}
else {
System.out.println("usage: JobTracker [-dumpConfiguration]");
System.exit(-1);
}
}
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
}
}
startTracker()方法比较简单,通过几次方法调用最终生成JobTracker对象。下面重点分析offerService()方法。由于篇幅限制,只列出了最重要的部分:
public void offerService() throws InterruptedException, IOException {
...... // Initialize the JobTracker FileSystem within safemode
setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
initializeFilesystem();
setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE); // Initialize JobTracker
initialize(); ......
taskScheduler.start();
首先进入安全模式下(SAFEMODE_ENTER),初始化文件系统,然后退出安全模式(SAFEMODE_LEAVE)。然后初始化JobTracker。最后启动作业调度器(TaskScheduler)。默认的作业调度器是JobQueueTaskScheduler,在mapred-default.xml中配置。所以taskScheduler.start()会调用JobQueueTaskScheduler的start()方法。如下所示:
JobQueueTaskScheduler使用FIFO来对job进行调度。下面来进入到JobQueueTaskScheduler来分析start()方法。
@Override
public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}
这里用到了观察者模式,JobQueueTaskScheduler向JobTracker注册了两个JobInProgressListener:EagerTaskInitializationListener和JobQueueJobInProgressListener,分别用于作业初始化和作业排序。
这里的taskTrackerManager实际上是JobTracker,因为JobTracker的父类就是TaskTrackerManager。在JobTracker的startTracker()方法中,将JobTracker实例传递给TaskTrackerManager。如下所示:
public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize)
throws IOException, InterruptedException {
DefaultMetricsSystem.initialize("JobTracker");
JobTracker result = null;
while (true) {
try {
result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
......
在eagerTaskInitializationListener.start()方法启动一个线程JobInitManager,这个线程用来监控jobInitQueue,即List<JobInProgress>。当有新的job(JobInProgress)加入到队列中时,JobInitManager线程就对其进行初始化。
class JobInitManager implements Runnable { public void run() {
JobInProgress job = null;
while (true) {
try {
synchronized (jobInitQueue) {
while (jobInitQueue.isEmpty()) {
jobInitQueue.wait();
}
job = jobInitQueue.remove(0); //从队列中拿出一个job
}
threadPool.execute(new InitJob(job)); //对job进行初始化
} catch (InterruptedException t) {
LOG.info("JobInitManagerThread interrupted.");
break;
}
}
LOG.info("Shutting down thread pool");
threadPool.shutdownNow();
}
} class InitJob implements Runnable { private JobInProgress job; public InitJob(JobInProgress job) {
this.job = job;
} public void run() {
ttm.initJob(job); //实质上调用JobTracker的initJob()方法进行初始化
}
}
这里JobInitManager线程最终调用了JobTracker的initJob()方法来对job进行初始化。具体过程下篇文章中再写。
最后画个流程图来总结一下,画的不好,将就看一下吧。
本文基于hadoop1.2.1
如有错误,还请指正
参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成