一、几个可能会用到的属性值
1、mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution
这两个属性可以决定Map任务和Reduce任务是否开启推测式执行策略。推测式执行策略在Hadoop中用来应对执行缓慢的任务所造成的瓶颈,但是对代码缺陷所导致的任务执行过慢,推测执行是一种反向的作用,应当避免,而Hadoop默认是开启推测式执行的。
2、mapred.job.reuse.jvm.num.tasks
这个属性值默认为1,表示一个JVM上只运行一个任务。如果Task Tracker具有很多小任务,重复启动JVM是非常耗时的,可以考虑任务JVM重用。将属性值设置为大于1的值表示启用了JVM;设置为-1表示共享此JVM的任务数目不受限制。尽管启用了JVM重用,但是Task Tracker上的多个任务还是顺序执行的,只是不需要额外启动JVM而已。
3、Task Tracker的本地参数值
名称 | 类型 | 描述 |
mapred.job.id | String | job id |
mapred.jar | String | job目录下job.jar的位置 |
job.local.dir | String | job指定的共享存储空间 |
mapred.tip.id | String | task id |
mapred.task.id | String | task尝试id |
mapred.task.partition | int | task在job中的id |
map.input.file | String | map读取的文件名 |
map.input.start | long | map输入的数据块的起始位置的偏移 |
map.input.length | long | map输入的数据块的字节数 |
mapred.work.output.dir | String | task临时输出目录 |
二、Hadoop工作流程
以下Hadoop源码来源于hadoop-1.2.1。
1、作业的提交
hadoop项目的第一步自然是根据需求进行代码编写,而后是需要配置Map类、Reduce类、Input路径、Output路径以及其他的虚拟机配置等。但这些操作只是为hadoop形成了一个作业,还算不上Hadoop的工作流程,将项目提交执行后,hadoop进入完全自动的运行方式,工作流程才开始启动。
作业的提交代码实现于JobClient类(JobClient.java)的submitJobInternal方法,分为以下几个步骤:
a.获取Job的ID
JobID jobId = jobSubmitClient.getNewJobId();
b.分配job在HDFS中的资源空间,配置其路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); //jobCopy是拷贝的一个job副本
JobStatus status = null;
c.监控地址令牌,直到可以对该空间进行操作。
TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),new Path [] {submitJobDir},jobCopy);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
int reduces = jobCopy.getNumReduceTasks();
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
job.setJobSubmitHostAddress(ip.getHostAddress());
job.setJobSubmitHostName(ip.getHostName());
}
JobContext context = new JobContext(jobCopy, jobId);
d.检查output类型
if (reduces == 0 ? jobCopy.getUseNewMapper() :
jobCopy.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(),
jobCopy);
output.checkOutputSpecs(context);
} else {
jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
}
e.为job划分splits
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
f.在job的配置文件中写入相关的排队、配置信息
String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
g.将job的信息通知给JobTrack
FSDataOutputStream out = FileSystem.create(fs, submitJobFile,new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
f.将job写入到HDFS中。需要注意的是,写入job之前要先将cache中的数据清除。这样做的原因:cache中的数据对当前job是没有用的了,如果在job写入之前清除是没有影响的;但是,如果在写入之后清除,cache中的很可能已经被更改为其他job的信息了,清除cache中的数据会破坏其他的job。
TokenCache.cleanUpTokenReferral(jobCopy);
try {
jobCopy.writeXml(out);
} finally {
out.close();
}
g.调用submitJob类来真正的提交job
printTokens(jobId, jobCopy.getCredentials());
status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());
JobProfile prof = jobSubmitClient.getJobProfile(jobId);
if (status != null && prof != null) {
return new NetworkedJob(status, prof, jobSubmitClient);
} else {
throw new IOException("Could not launch job");
}
至此,一个作业就被完整地提交给JobTrack和HDFS了。
2、作业的初始化
作业的初始化主要是考虑到map和reduce都需要进行初始化,因此需要注意其中的细节。
作业的初始化源码实现于JobInprogress类(JobInProgress.java)的initTask方法。作业初始化的步骤如下:
a.首先需要获取job的一些配置信息和运行信息
if (!jobtracker.getConf().getBoolean(JT_JOB_INIT_EXCEPTION_OVERRIDE, false) && getJobConf().getBoolean(JOB_INIT_EXCEPTION, false)) {
waitForInitWaitLockForTests();
}
if (tasksInited || isComplete()) {
return;
}
synchronized(jobInitKillStatus){ //互锁操作
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
}
b.获取job的执行信息和优先级信息
c.根据input splits的数目决定map的数目,每一个splits都需要一个map
TaskSplitMetaInfo[] splits = createSplits(jobId);
if (numMapTasks != splits.length) {
throw new IOException("Number of maps in JobConf doesn't match number of " +
"recieved splits for job " + jobId + "! " +
"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
}
numMapTasks = splits.length;
d.检查splits的位置,保证创建的map/reduce或者初始化的map/reduce都是有意义的,同时还需要在监控进程中设置map和reduce的信息
for (TaskSplitMetaInfo split : splits) {
NetUtils.verifyHostnames(split.getLocations());
}
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
e.为splits分配map
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}
LOG.info("Input size for job " + jobId + " = " + inputLength+ ". Number of splits = " + splits.length);
f.将map放入到等待执行的缓冲区内
localityWaitFactor = conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}
// set the launch time
this.launchTime = jobtracker.getClock().getTime();
g.同样的对待reduce操作
h.创建两个清除进程,一个清除map,一个清除reduce
cleanup = new TaskInProgress[2];
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();
i.创建两个设置进程,分别对map和reduce进行初始化设置
setup = new TaskInProgress[2];
// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
j.最后,设置一个互锁的方法,用来检查初始化是否成功,并写入日志
synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
// set this before the throw to make sure cleanup works properly
tasksInited = true;
if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks);
至此,作业的初始化操作就已经完成了。由于初始化操作需要根据splits数目来确定需要初始化的map和reduce操作,因此相比较有点复杂。
3、作业的分配
作业的分配需要JobTrack和TaskTrack相互协调,JobTrack根据TaskTrack发送的心跳信息来决定任务的分配方法。心跳信息会报告当前任务的状况,会提出新的任务请求,或者提出任务执行失败的报告等。JobTrack接受到信息后会采取相应的措施。
Hadoop的作业分配是一种“拉”的方式,TaskTrack发送心跳信息(源码实现为TaskTrack类(TaskTrack.java)的transmitHeartBeat方法)给JobTrack来请求一个新的任务,而这个TaskTrack提出任务请求时也会提供自己当前map/reduce任务槽的数量供JobTrack参考。JobTrack根据心跳信息(源码实现为JobTrack类(JobTrack.java)的heartbeat方法)在本地优先的情况下对该TaskTrack分配Task。
4、作业的执行
作业的执行同样在TaskTrack中完成, 在作业执行时,第一步是将任务进行本地化,第二步是通过虚拟机执行任务。
任务本地化,基于TaskTrack.java的localizeJob方法实现,主要步骤为:
a.将job.split复制到本地;b.将job.jar复制到本地;c.将job的配置信息写入job.xml;d.创建本地目录,解压缩job.jar;e.调用launchTaskForJob方法发布任务;f.调用launchTask方法启动任务。
任务的执行包括map执行(MapTaskRunner类)和reduce执行(ReduceTaskRunner类),每个任务的执行都是通过一个JVM实现的。
5、作业的进度
作业进度的更新依赖于TaskTrack向JobTrack发送的心跳信息。每个TaskTrack都会将自己的进度信息和状态信息封装在心跳信息中,每隔5秒向JobTrack发送一次。JobTrack在收集所有的信息后统一并得出全局信息。