我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker节点上准备运行这个Task。Task的运行是在一个与TaskTracker进程隔离的JVM实例中执行,该JVM实例是通过org.apache.hadoop.mapred.Child来创建的,所以在创建Child VM实例之前,需要做大量的准备工作来启动Task运行。一个Task的启动过程,如下序列图所示:
通过上图,结合源码,我们将一个Task启动的过程,分为下面3个主要的步骤:
- 初始化跟踪Task运行的相关数据结构
- 准备Task运行所共享的Job资源
- 启动Task
下面,我们详细分析上面3个步骤的流程:
初始化跟踪Task运行的相关数据结构
如果是LaunchTaskAction,则TaskTracker会将该指令加入到一个启动Task的队列中,进行一步加载处理,如下所示:
1 |
private void addToTaskQueue(LaunchTaskAction action) {
|
2 |
if (action.getTask().isMapTask()) {
|
3 |
mapLauncher.addToTaskQueue(action);
|
5 |
reduceLauncher.addToTaskQueue(action);
|
根据Task的类型,分别加入到对应类型的TaskLauncher的队列中。这里需要了解一下TaskLauncher线程类,在TaskTracker中创建了2个TaskLauncher线程,一个是为启动MapTask,另一个是为启动ReduceTask。下面是TaskLauncher类的构造方法:
1 |
public TaskLauncher(TaskType taskType, int numSlots) {
|
2 |
this .maxSlots = numSlots;
|
3 |
this .numFreeSlots = new IntWritable(numSlots);
|
4 |
this .tasksToLaunch = new LinkedList<TaskInProgress>();
|
6 |
setName( "TaskLauncher for " + taskType + " tasks" );
|
构造方法中,参数taskType表示Task类型,分为MapTask和ReduceTask,参数numSlots表示对每一种类型的Task每个TaskTracker上最多可以启动的Task的实例数,默认都是2个。在TaskTracker初始化时,会读取mapred-site.xml配置文件,读取mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum配置的参数值,分别赋值给maxMapSlots和maxReduceSlots这2个属性,如下TaskTracker构造方法中初始化这2个属性:
1 |
maxMapSlots = conf.getInt( "mapred.tasktracker.map.tasks.maximum" , 2 );
|
2 |
maxReduceSlots = conf.getInt( "mapred.tasktracker.reduce.tasks.maximum" , 2 );
|
然后,在TaskTracker创建时,会根据上述maxMapSlots和maxReduceSlots的值来创建并启动2个TaskLauncher线程:
1 |
mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
|
2 |
reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
|
4 |
reduceLauncher.start(); |
将LaunchTaskAction加入到TaskLauncher的队列中,这个是调用TaskLauncher的addToTaskQueue()方法:
1 |
public void addToTaskQueue(LaunchTaskAction action) {
|
2 |
synchronized (tasksToLaunch) {
|
3 |
TaskInProgress tip = registerTask(action, this );
|
4 |
tasksToLaunch.add(tip);
|
5 |
tasksToLaunch.notifyAll();
|
上面方法中,最关键的就是registerTask()方法,调用该方法来初始化TaskTracker端Task对应TaskInProgress结构,代码如下所示:
01 |
private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) {
|
02 |
Task t = action.getTask();
|
03 |
LOG.info( "LaunchTaskAction (registerTask): " + t.getTaskID() + " task's state:" + t.getState());
|
04 |
TaskInProgress tip = new TaskInProgress(t, this .fConf, launcher);
|
06 |
tasks.put(t.getTaskID(), tip);
|
07 |
runningTasks.put(t.getTaskID(), tip);
|
08 |
boolean isMap = t.isMapTask();
|
上面方法中,tasks队列用来记录该TaskTracker上所有的Task,包括正在运行和已经完成的Task,而队列runningTasks则表示当前TaskTracker上正在运行的Task。同时,通过mapTotal和reduceTotal来分别记录当前TaskTracker上运行的总的MapTask和ReduceTask的数量。
根据LaunchTaskAction创建的TaskInProgress结构被加入到队列tasksToLaunch中,然后通知TaskLauncher线程,在方法run中检测并取出队列中TaskInProgress对象,并判断当前TaskTracker的资源状态能否启动一个Task,如果可以则调用startNewTask()方法启动Task,代码如下所示:
03 |
synchronized (tasksToLaunch) {
|
04 |
while (tasksToLaunch.isEmpty()) {
|
07 |
tip = tasksToLaunch.remove( 0 );
|
09 |
LOG.info( "Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots" );
|
11 |
synchronized (numFreeSlots) {
|
12 |
boolean canLaunch = true ;
|
13 |
while (numFreeSlots.get() < task.getNumSlotsRequired()) {
|
14 |
if (!tip.canBeLaunched()) {
|
18 |
LOG.info( "TaskLauncher : Waiting for " + task.getNumSlotsRequired() + " to launch " + task.getTaskID() + ", currently we have " + numFreeSlots.get() + " free slots" );
|
24 |
LOG.info( "In TaskLauncher, current free slots : " + numFreeSlots.get()+ " and trying to launch " +tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots" );
|
25 |
numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
|
26 |
assert (numFreeSlots.get() >= 0 );
|
30 |
if (!tip.canBeLaunched()) {
|
31 |
LOG.info( "Not launching task " + task.getTaskID() + " as it got killed externally. Task's state is " + tip.getRunState());
|
32 |
addFreeSlots(task.getNumSlotsRequired());
|
这样,当前TaskTracker所在节点的资源状态,和Task对应的TIP状态都已经满足启动Task的要求,可以启动一个Task去运行。
准备Task运行所共享的Job资源
调用startNewTask()方法,异步地启动了一个单独的线程去启动Task,该方法如下所示:
01 |
void startNewTask( final TaskInProgress tip) throws InterruptedException {
|
02 |
Thread launchThread = new Thread( new Runnable() {
|
06 |
RunningJob rjob = localizeJob(tip);
|
07 |
tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
|
08 |
launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);
|
09 |
} catch (Throwable e) {
|
如果在一个TaskTracker节点上运行的多个Task都属于同一个Job(一个TaskTracker上运行的Task按照Job来分组,每一组Task都属于同一个Job),那么第一次初始化时,还没有建立一个Task到Job的映射关系,也就是说,在TaskTracker端也要维护Job的状态,以及属于该Job的所有Task的状态信息。比如,如果用户提交了一个kill掉Job的请求,那么正在运行的属于该Job的所有Task都应该被kill掉。
上面代码中调用localizeJob()方法,执行了如下处理:
- 创建一个RunningJob对象,并加入到TaskTracker维护的runningJobs队列(包含了JobID到RunningJob的映射关系)中,同时将Task对应的TIP对象加入到RunningJob所维护的tasks队列中。
- 一个Job完成初始化,还需要将Job相关的信息,如Job配置信息从HDFS上下载到TaskTracker所在节点本地,供该Job的一组Task运行共享。我们知道,在JobClient提交Job时,会将相关资源拷贝到HDFS上的指定目录中,例如,在HDFS上的/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/目录下存储Job相关的资源文件,拷贝到TaskTracker本地目录下,例如/tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/目录。
- 调用TaskController的initializeJob()方法初始化Job所包含的相关资源信息,为属于该Job的一组Task所共享。
这里,TaskController使用的LinuxTaskController实现类,通过调用该方法,实际上构造了一个Shell命令行,用来在TaskTracker节点上初始化目录和拷贝相关资源,该命令行示例如下所示:
1 |
/usr/ local /java/bin/java -classpath .:/usr/ local /java/lib/*.jar;/usr/ local /java/jre/lib/*.jar -Dhadoop.log. dir =/tmp/hadoop/logs -Dhadoop.root.logger=INFO,console -Djava.library.path= org.apache.hadoop.mapred.JobLocalizer shirdrn job_200912121733_0002
|
通过工具ShellCommandExecutor来执行上述命令行,启动一个单独的JVM实例,完成Job资源初始化,完成即退出。通过上述命令行可以看到,主要的初始化工作都在JobLocalizer中完成的,需要传入2个参数:用户、jobid,在JobLocalizer中创建了一个Job所包含的各种资源,供Task在TaskTracker节点上运行共享,这些相关的目录或资源文件包括:
1 |
${mapred. local . dir }/taskTracker/${user}
|
2 |
${mapred. local . dir }/taskTracker/${user}/jobcache
|
3 |
${mapred. local . dir }/taskTracker/${user}/jobcache/${jobid}/work
|
4 |
${mapred. local . dir }/taskTracker/${user}/jobcache/${jobid}/jars
|
5 |
${mapred. local . dir }/taskTracker/${user}/jobcache/${jobid}/jars/job.jar
|
6 |
${mapred. local . dir }/taskTracker/${user}/jobcache/${jobid}/job.xml
|
7 |
${mapred. local . dir }/taskTracker/${user}/jobcache/${jobid}/jobToken
|
8 |
${mapred. local . dir }/taskTracker/${user}/distcache
|
这样,在一个TaskTracker节点上运行的一组Task所共享的对应唯一Job相关的资源已经满足,接下来就可以启动Task了。
启动Task
启动Task的流程相对复杂一些,我们分几个阶段/要点来进行说明:
启动Task准备
在startNewTask()方法中调用localizeJob()方法,完成了Job资源在TaskTracker节点上的初始化,接着就可以调用launchTaskForJob()方法进入启动Task的处理流程,代码如下所示:
1 |
protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
|
2 |
RunningJob rjob) throws IOException {
|
4 |
jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localStorage.getDirsString());
|
5 |
tip.setJobConf(jobConf);
|
通过调用TaskInProgress tip的launchTask()方法来启动Task,我们看一下该方法实现代码:
01 |
public synchronized void launchTask(RunningJob rjob) throws IOException {
|
02 |
if ( this .taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
|
03 |
this .taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
|
04 |
this .taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
|
06 |
if ( this .taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
|
07 |
this .taskStatus.setRunState(TaskStatus.State.RUNNING);
|
09 |
setTaskRunner(task.createRunner(TaskTracker. this , this , rjob));
|
11 |
long now = System.currentTimeMillis();
|
12 |
this .taskStatus.setStartTime(now);
|
13 |
this .lastProgressReport = now;
|
15 |
LOG.info( "Not launching task: " + task.getTaskID() + " since it's state is " + this .taskStatus.getRunState());
|
TaskInProgress里面taskStatus维护了一个TIP的状态,通过上述代码可以看出,一个Task只有具备下面3个状态之一:UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN,才能够被启动。
首先要进行Task的初始化,调用localizeTask()方法,如下所示:
1 |
void localizeTask(Task task) throws IOException{
|
2 |
task.localizeConfiguration(localJobConf);
|
3 |
task.setConf(localJobConf);
|
在这里,Task可能是MapTask,也可能是ReduceTask,所以调用task.localizeConfiguration()的初始化逻辑稍微有些不同,具体可以查看MapTask和ReduceTask类实现。另外,对于不同类型的Task,也会创建不同类型的TaskRunner线程,分别对应于MapTaskRunner和ReduceTaskRunner,实际所有Task启动的相关逻辑都是在这2个TaskRunner中实现的。
在TaskRunner中,主要逻辑是在run()方法中实现的,其中在调用launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir)之前,做了一些准备工作:
- 构建setupCmds:读取系统环境变量,或者hadoop设置的环境变量,LD_LIBRARY_PATH、LD_LIBRARY_PATH、USER、SHELL、LOGNAME、HOME、HADOOP_TOKEN_FILE_LOCATION、HADOOP_ROOT_LOGGER、HADOOP_CLIENT_OPTS、HADOOP_CLIENT_OPTS,这些变量都是键值对的形式,最后会通过export在当前环境下导出这些变量配置
- 构建vargs:设置启动Child VM的配置,读取mapred-site.xml配置文件中mapred.map.child.java.opts和mapred.reduce.child.java.opts的配置内容,最终会使用org.apache.hadoop.mapred.Child创建一个JVM实例来启动Task
- 目录文件设置:包括2个日志文件stdout和stderr,以及当前启动JVM所在的目录workDir
使用JvmManager管理启动Task相关数据
完成上述准备工作以后,调用launchJvmAndWait()方法,创建Child VM实例,如下所示:
01 |
void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
|
02 |
File stderr, long logSize, File workDir)
|
03 |
throws InterruptedException, IOException {
|
04 |
jvmManager.launchJvm( this , jvmManager.constructJvmEnv(setup, vargs, stdout, stderr, logSize, workDir, conf));
|
最终是通过JvmManager来实现JVM实例的创建,下面是JvmManager保存的一些数据结构,用来维护JVM相关数据的数据结构,如下图所示:
可以看到,一个JvmManager对应2个JvmManagerForType,分别负责管理MapTask和ReduceTask启动对应的Child VM等数据,JvmManager的构造方法,如下所示:
1 |
public JvmManager(TaskTracker tracker) {
|
2 |
mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), true , tracker);
|
3 |
reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(), false , tracker);
|
上面调用了jvmManager.launchJvm()方法,其中内部根据Task类型,选择调用mapJvmManager或reduceJvmManager的reapJvm()方法,如下所示:
01 |
private synchronized void reapJvm(TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
|
02 |
if (t.getTaskInProgress().wasKilled()) {
|
05 |
boolean spawnNewJvm = false ;
|
06 |
JobID jobId = t.getTask().getJobID();
|
07 |
int numJvmsSpawned = jvmIdToRunner.size();
|
08 |
JvmRunner runnerToKill = null ;
|
10 |
if (numJvmsSpawned >= maxJvms) {
|
11 |
Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = jvmIdToRunner.entrySet().iterator();
|
12 |
while (jvmIter.hasNext()) {
|
13 |
JvmRunner jvmRunner = jvmIter.next().getValue();
|
14 |
JobID jId = jvmRunner.jvmId.getJobId();
|
15 |
if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
|
16 |
setRunningTaskForJvm(jvmRunner.jvmId, t);
|
17 |
LOG.info( "No new JVM spawned for jobId/taskid: " + jobId+ "/" +t.getTask().getTaskID() + ". Attempting to reuse: " + jvmRunner.jvmId);
|
23 |
if ((jId.equals(jobId) && jvmRunner.ranAll()) || (!jId.equals(jobId) && !jvmRunner.isBusy())) {
|
24 |
runnerToKill = jvmRunner;
|
33 |
if (runnerToKill != null ) {
|
34 |
LOG.info( "Killing JVM: " + runnerToKill.jvmId);
|
35 |
killJvmRunner(runnerToKill);
|
37 |
spawnNewJvm(jobId, env, t);
|
40 |
} catch (Exception e) {
|
上面代码中,调用setRunningTaskForJvm()很关键,实际上把需要启动的Task与JvmRunner建立映射关系,更新相应的内存数据结构(队列),如下所示:
1 |
synchronized public void setRunningTaskForJvm(JVMId jvmId,
|
3 |
jvmToRunningTask.put(jvmId, t);
|
4 |
runningTaskToJvm.put(t,jvmId);
|
5 |
jvmIdToRunner.get(jvmId).setBusy( true );
|
该方法,在spawnNewJvm()方法也调用了,spawnNewJvm()方法创建了一个新的JVM,代码如下所示:
1 |
private void spawnNewJvm(JobID jobId, JvmEnv env, TaskRunner t) {
|
2 |
JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
|
3 |
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
|
4 |
jvmRunner.setDaemon( true );
|
5 |
jvmRunner.setName( "JVM Runner " + jvmRunner.jvmId + " spawned." );
|
6 |
setRunningTaskForJvm(jvmRunner.jvmId, t);
|
7 |
LOG.info(jvmRunner.getName());
|
接下来,我们看一下JvmRunner线程类,该线程体run()方法中直接调用了runChild()方法,该方法实现代码,如下所示:
01 |
public void runChild(JvmEnv env) throws IOException, InterruptedException{
|
04 |
env.vargs.add(Integer.toString(jvmId.getId()));
|
05 |
TaskRunner runner = jvmToRunningTask.get(jvmId);
|
07 |
Task task = runner.getTask();
|
08 |
String user = task.getUser();
|
09 |
TaskAttemptID taskAttemptId = task.getTaskID();
|
10 |
String taskAttemptIdStr = task.isTaskCleanupTask() ? (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) : taskAttemptId.toString();
|
11 |
exitCode = tracker.getTaskController().launchTask(user, jvmId.jobId.toString(), taskAttemptIdStr, env.setup, env.vargs, env.workDir, env.stdout.toString(), env.stderr.toString());
|
13 |
} catch (IOException ioe) {
|
19 |
updateOnJvmExit(jvmId, exitCode);
|
20 |
LOG.info( "JVM : " + jvmId + " exited with exit code " + exitCode + ". Number of tasks it ran: " + numTasksRan);
|
21 |
deleteWorkDir(tracker, firstTask);
|
在JvmRunner线程类中,其中委托TaskController来控制Task的实际启动。
使用TaskController控制启动Child VM
下面,我们看TaskController启动Task的实现方法launchTask(),代码如下所示:
02 |
public int launchTask(String user, String jobId, String attemptId, List<String> setup, List<String> jvmArguments,
|
03 |
File currentWorkDirectory, String stdout, String stderr) throws IOException {
|
04 |
ShellCommandExecutor shExec = null ;
|
06 |
FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
|
08 |
String cmdLine = TaskLog.buildCommandLine(setup, jvmArguments, new File(stdout), new File(stderr), logSize, true );
|
10 |
Path p = new Path(allocator.getLocalPathForWrite( TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId), getConf()), COMMAND_FILE);
|
11 |
String commandFile = writeCommand(cmdLine, rawFs, p);
|
13 |
new String[]{taskControllerExe,
|
15 |
localStorage.getDirsString(),
|
16 |
Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
|
19 |
currentWorkDirectory.toString(),
|
21 |
shExec = new ShellCommandExecutor(command);
|
23 |
if (LOG.isDebugEnabled()) {
|
24 |
LOG.debug( "launchTask: " + Arrays.toString(command));
|
27 |
} catch (Exception e) {
|
31 |
int exitCode = shExec.getExitCode();
|
32 |
LOG.warn( "Exit code from task is : " + exitCode);
|
36 |
if (exitCode != 143 && exitCode != 137 ) {
|
37 |
LOG.warn( "Exception thrown while launching task JVM : " + StringUtils.stringifyException(e));
|
38 |
LOG.info( "Output from LinuxTaskController's launchTaskJVM follows:" );
|
39 |
logOutput(shExec.getOutput());
|
43 |
if (LOG.isDebugEnabled()) {
|
44 |
LOG.debug( "Output from LinuxTaskController's launchTask follows:" );
|
45 |
logOutput(shExec.getOutput());
|
将构造好的启动Child的命令行写入到本地目录下的文件中,该脚本文件的绝对路径,示例如下所示:
1 |
/tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh |
在TaskController(实际上是LinuxTaskController)的launchTask()方法中,使用ShellCommandExecutor工具执行的命令行,类似如下这样:
1 |
/usr/ local /hadoop/bin/task-controller shirdrn /tmp/mapred/ local 1 job_200912121733_0002 attempt_200912121733_0002_m_000005_0 /tmp/mapred/ local /ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh
|
在taskjvm.sh脚本中的内容,才是真正启动Child VM的命令行,示例如下所示:
1 |
/usr/ local /bin/java -Xmx 512M -verbose:gc -Xloggc:/tmp/attempt_200912121733_0002_m_000005_0.gc -Dcom.sun.management.jmxremote.authenticate= false -Dcom.sun.management.jmxremote.ssl= false -Djava.library.path= -Djava.io.tmpdir= -classpath .:/usr/ local /java/lib/*.jar:/usr/ local /java/jre/lib/*.jar -Dlog4j.configuration=task-log4j.properties -Dhadoop.log. dir =/tmp/hadoop/logs -Dhadoop.root.logger=INFO,TLA -Dhadoop.tasklog.taskid=attempt_200912121733_0002_m_000005_0 -Dhadoop.tasklog.iscleanup= false -Dhadoop.tasklog.totalLogFileSize= org.apache.hadoop.mapred.Child 127.0.0.1 0 attempt_200912121733_0002_m_000005_0 /tmp/hadoop/logs/userlogs/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/ 2134
|
至此,一个Task通过Child VM的加载已经启动,就可以运行一个Task了,我们后续再详细介绍。