前言
首先确保已经搭建好Hadoop集群环境,可以参考《Linux下Hadoop集群环境的搭建》一文的内容。我在测试mapreduce任务时,发现相比于使用Job.setNumReduceTasks(int)控制reduce任务数量而言,控制map任务数量一直是一个困扰我的问题。好在经过很多摸索与实验,终于梳理出来,希望对在工作中进行Hadoop进行性能调优的新人们有个借鉴。本文只针对FileInputFormat的任务划分进行分析,其它类型的InputFormat的划分方式又各有不同。虽然如此,都可以按照本文类似的方法进行分析和总结。
为了简便起见,本文以Hadoop2.6.0自带的word count例子为例,进行展开。
wordcount
我们首先准备好wordcount所需的数据,一共有两份文件,都位于hdfs的/wordcount/input目录下:
这两个文件的内容分别为:
On the top of the Crumpretty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
和
But his face you could not see,
On account of his Beaver Hat.
有关如何操作hdfs并准备好数据的细节,本文不作赘述。
现在我们不作任何性能优化(不增加任何配置参数),然后执行hadoop-mapreduce-examples子项目(有关此项目介绍,可以阅读《Hadoop2.6.0子项目hadoop-mapreduce-examples的简单介绍》一文)中自带的wordcount例子:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6..jar wordcount /wordcount/input /wordcount/output/result1
当然也可以使用朴素的方式运行wordcount例子:
hadoop org.apache.hadoop.examples.WordCount -D mapreduce.input.fileinputformat.split.maxsize= /wordcount/input /wordcount/output/result1
最后执行的结果在hdfs的/wordcount/output/result1目录下:
执行结果可以查看/wordcount/output/result1/part-r-00000的内容:
第一次优化
wordcount例子,查看运行结果不是本文的目的。在执行wordcount例子时,在任务运行信息中可以看到创建的map及reduce任务的数量:
可以看到FileInputFormat的输入文件有2个,JobSubmitter任务划分的数量是2,最后产生的map任务数量也是2,看到这我们可以猜想由于我们提供了两个输入文件,所以会有2个map任务。我们此处姑且不论这种猜测正确与否,现在我们打算改变map任务的数量。通过查看文档,很多人知道使用mapreduce.job.maps参数可以快速修改map任务的数量,事实果真如此?让我们先来实验一番,输入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6..jar wordcount -D mapreduce.job.maps= /wordcount/input /wordcount/output/result2
执行以上命令后,观察输出的信息,与之前未添加mapreduce.job.maps参数的输出信息几乎没有变化。难道Hadoop的实现人员开了一个玩笑,亦或者这是一个bug?我们先给这个问题在我们的大脑中设置一个检查点,最后再来看看究竟是怎么回事。
第二次优化
用mapreduce.job.maps调整map任务数量没有见效,我们翻翻文档,发现还有mapreduce.input.fileinputformat.split.minsize参数,它可以控制map任务输入划分的最小字节数。这个参数和mapreduce.input.fileinputformat.split.maxsize通常配合使用,后者控制map任务输入划分的最大字节数。我们目前只调整mapreduce.input.fileinputformat.split.minsize的大小,划分最小的尺寸变小是否预示着任务划分数量变多?来看看会发生什么?输入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6..jar wordcount -D mapreduce.input.fileinputformat.split.minsize= /wordcount/input /wordcount/output/result3
执行以上命令后,观察输出信息,依然未发生改变。好吧,弟弟不给力,我们用它的兄弟参数mapreduce.input.fileinputformat.split.maxsize来控制。如果我们将mapreduce.input.fileinputformat.split.maxsize改得很小,会怎么样?输入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6..jar wordcount -D mapreduce.input.fileinputformat.split.maxsize= /wordcount/input /wordcount/output/result4
这是的信息有了改变,我们似乎取得了想要的结果:
呵呵,任务划分成了177个,想想也是,我们把最大的划分字节数仅仅设置为1字节。接着往下看确实执行了177个map任务:
我们还可以通过Web UI观察map任务所分配的Container。首先查看Slave1节点上分配的Container情况:
再来看看Slave2节点上分配的Container情况:
确实说明最多有15个Container分配给当前作业执行map任务。由于在YARN中yarn.nodemanager.resource.cpu-vcores参数的默认值是8,所以Slave1和Slave2两台机器上的虚拟cpu总数是16,由于ResourceManager会为mapreduce任务分配一个Container给ApplicationMaster(即MrAppMaster),所以整个集群只剩余了15个Container用于ApplicationMaster向NodeManager申请和运行map任务。
第三次优化
阅读文档我们知道dfs.blocksize可以控制块的大小,看看这个参数能否发挥作用。为便于测试,我们首先需要修改hdfs-site.xml中dfs.blocksize的大小为10m(最小就只能这么小,Hadoop限制了参数单位至少是10m)。
<property>
<name>dfs.blocksize</name>
<value>10m</value>
</property>
然后,将此配置复制到集群的所有NameNode和DataNode上。为了使此配置在不重启的情况下生效,在NameNode节点上执行以下命令:
hadoop dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
我们使用以下命令查看下系统内的文件所占用的blocksize大小:
hadoop dfs -stat "%b %n %o %r %y" /wordcount/input/quangle*
输出结果如下:
可以看到虽然quangle.txt和quangle2.txt的字节数分别是121字节和56字节,但是在hdfs中这两个文件的blockSize已经是10m了。现在我们试试以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6..jar wordcount /wordcount/input /wordcount/output/result5
观察输出信息,发现没有任何效果。
源码分析
经过以上3次不同实验,发现只有mapreduce.input.fileinputformat.split.maxsize参数确实影响了map任务的数量。现在我们通过源码分析,来一探究竟吧。
首先我们看看WordCount例子的源码,其中和任务划分有关的代码如下:
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
// 省略本文不关心的代码
return isSuccessful();
}
这里的submit方法的实现如下:
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
// 省略本文不关心的代码</span>
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
writeSplits方法的实现如下:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job); // generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
getFormatMinSplitSize方法固定返回1,getMinSplitSize方法实际就是mapreduce.input.fileinputformat.split.minsize参数的值(默认为1),那么变量minSize的大小为mapreduce.input.fileinputformat.split.minsize与1之间的最大值。
getMaxSplitSize方法实际是mapreduce.input.fileinputformat.split.maxsize参数的值,那么maxSize即为mapreduce.input.fileinputformat.split.maxsize参数的值。
由于我的试验中有两个输入源文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为2。
在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
因此我们知道每个输入文件被划分的公式如下:
map任务要划分的大小(splitSize )=(maxSize与blockSize之间的最小值)与minSize之间的最大值
bytesRemaining 是单个输入源文件未划分的字节数
根据getSplits方法,我们知道map任务划分的数量=输入源文件数目 * (bytesRemaining / splitSize个划分任务+bytesRemaining不能被splitSize 整除的剩余大小单独划分一个任务 )
总结
根据源码分析得到的计算方法和之前的优化结果,我们最后总结一下:
对于第一次优化,由于FileOutputFormat压根没有采用mapreduce.job.maps参数指定的值,所以它当然不会有任何作用。
对于第二次优化,minSize几乎由mapreduce.input.fileinputformat.split.minsize控制;mapreduce.input.fileinputformat.split.maxsize默认的大小是Long.MAX_VALUE,所以blockSize即为maxSize与blockSize之间的最小值;blockSize的默认大小是128m,所以blockSize与值为1的mapreduce.input.fileinputformat.split.minsize之间的最大值为blockSize,即map任务要划分的大小的大小与blockSize相同。
对于第三次优化,虽然我们将blockSize设置为10m(最小也只能这么小了,hdfs对于block大小的最低限制),根据以上公式maxSize与blockSize之间的最小值必然是blockSize,而blockSize与minSize之间的最大值也必然是blockSize。说明blockSize实际上已经发挥了作用,它决定了splitSize的大小就是blockSize。由于blockSize大于bytesRemaining,所以并没有对map任务数量产生影响。
针对以上分析,我们用更加容易理解的方式列出这些配置参数的关系:
当mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.minsize参数决定。
当mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由dfs.blockSize配置决定。(第二次优化符合此种情况)
当dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.maxsize参数决定。
鸣谢
我在试验的过程中,遇到很多问题。但是很多问题在网络上都能找到,特此感谢在互联网上分享经验的同仁们。
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东(现有满150减50活动)):http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html