前言 :本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳 理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。
作者
:Jaytalent
开始日期
:2013年9月9日参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成【2】Hadoop 1.0.0 源码
【3】《Hadoop技术内幕--深入解析Hadoop Common和HDFS架构设计与实现原理》蔡斌 陈湘萍
【1】:
作业提交与初始化
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
JobID jobId = jobSubmitClient.getNewJobId();
作业ID时从JobTracker获取的,这是一次RPC调用,方法为getNewJobId,定义在JobSubmissionProtocol接口。
private JobSubmissionProtocol jobSubmitClient;
/**
* Allocate a name for the job.
* @return a unique job name for submitting jobs.
* @throws IOException
*/
public JobID getNewJobId() throws IOException;
用户使用该协议通过JobTracker提交作业,查看作业状态等。
copyAndConfigureFiles(jobCopy, submitJobDir);
在配置了提交副本数(mapred.submit.replication,默认为10)等信息后,主要代码分析如下(为了清晰起见,省略了一些日志和异常处理):
// Retrieve command line arguments placed into the JobConf
// by GenericOptionsParser.
String files = job.get("tmpfiles");
String libjars = job.get("tmpjars");
String archives = job.get("tmparchives");
首先,从配置中获取不同类型文件的名称和路径,这些配置在作业提交时从命令行(Hadoop Shell)指定。files表示作业依赖的普通文件,比如文本文件;libjars表示应用程序依赖的第三方jar包;archives表示应用程序使用的多个文件打包而成的压缩文件。
// Create a number of filenames in the JobTracker's fs namespace
FileSystem fs = submitJobDir.getFileSystem(job);
submitJobDir = fs.makeQualified(submitJobDir);
FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
接下来,在JobTracker的文件系统(通常为HDFS)的命名空间创建一系列文件路径名,其中包括前述三种文件类型。
有了路径名后,在HDFS上创建路径并将这些文件拷贝到对应的目录中,代码如下:
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem if (files != null) {
FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile: fileArr) {
URI tmpURI;
tmpURI = new URI(tmpFile); Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, job);
DistributedCache.createSymlink(job);
}
}
if (libjars != null) {
FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
DistributedCache.addArchiveToClassPath
(new Path(newPath.toUri().getPath()), job, fs);
}
}
if (archives != null) {
FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
for (String tmpArchives: archivesArr) {
URI tmpURI;
tmpURI = new URI(tmpArchives);
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, job);
DistributedCache.createSymlink(job);
}
注意,MapReduce作业文件的上传和下载是通过DistributedCache工具完成的,它是一个数据分发工具。用户指定的文件会被分发到各个TaskTracker上以运行Task。这里暂不涉及该工具的细节,留待日后讨论。
String originalJarPath = job.getJar();
if (originalJarPath != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())){
job.setJobName(new Path(originalJarPath).getName());
}
Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
job.setJar(submitJarFile.toString());
fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
fs.setReplication(submitJarFile, replication);
fs.setPermission(submitJarFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
}
注意,在每次上传一种类型的文件后,都会将这种文件的路径配置到JobConf对象中,具体的工作由
DistributedCache.addCacheFile(pathURI, job);
DistributedCache.addArchiveToClassPath(new Path(newPath.toUri().getPath()), job, fs);
DistributedCache.addCacheArchive(pathURI, job);
job.setJar(submitJarFile.toString());
这四行代码完成。顺便提一句,Path类Hadoop文件系统在java.net.URI的基础上抽象了文件系统中的路径 【3】 。Java的File类和URL类分别抽象了不同的事物,Path可以说将二者统一起来。
3. 生成InputSplit文件
// Create the splits for the job
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
jobCopy是一个JobConf对象。其中,writeSplits方法会实际调用InputSplit.getSplits方法生成splits信息,并将splits原始信息和元信息写入HDFS对应的目录和文件中。有关split的生成过程日后研究,这里不展开了。最后,将作业对应的JobConf对象以XML配置文件形式写入到HDFS中:
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try {
jobCopy.writeXml(out);
} finally {
out.close();
}
至此,作业文件上传才算正式完毕。
接下来,作业将被提交到JobTracker,请关注下篇文章: