${mapred.local.dir}选择策略--Map Task存放中间结果

  上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略。

 mapred-site.xml
<property>
<name>mapred.local.dir</name>
<value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local</value>
</property>

  当${mapred.local.dir}配置有多个目录分别用来挂载不同的硬盘时,Map Task的结果应该存放在哪个目录中?首先还是看一下方法的调用层次,如下图所示:

${mapred.local.dir}选择策略--Map Task存放中间结果

  下面分析这两个方法:

    /** Get a path from the local FS. If size is known, we go
* round-robin over the set of disks (via the configured dirs) and return
* the first complete path which has enough space.
*
* If size is not known, use roulette selection -- pick directories
* with probability proportional to their available space.
*/
public synchronized
Path getLocalPathForWrite(String pathStr, long size,
Configuration conf, boolean checkWrite
) throws IOException {
//检查task目录是否有变化
confChanged(conf);
int numDirs = localDirsPath.length; //获取${mapred.local.dir}目录的个数
int numDirsSearched = 0; //表示已经搜索过的次数
//remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked)
if (pathStr.startsWith("/")) { //是指output/spill0.out文件
pathStr = pathStr.substring(1);
}
Path returnPath = null;
Path path = new Path(pathStr); //当要写入的数据量大小未知时
if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
//proportional to available size
long[] availableOnDisk = new long[dirDF.length];
long totalAvailable = 0; //build the "roulette wheel"
for(int i =0; i < dirDF.length; ++i) {
//分别计算每一个${mapred.local.dir}目录可用大小,并计算总的可用大小
availableOnDisk[i] = dirDF[i].getAvailable();
totalAvailable += availableOnDisk[i];
} // Keep rolling the wheel till we get a valid path
Random r = new java.util.Random();
while (numDirsSearched < numDirs && returnPath == null) {
long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
int dir = 0;
while (randomPosition > availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir];
dir++;
}
dirNumLastAccessed = dir; //表示上次访问过的目录
//从${mapred.local.dir}中选择一个目录,在其下创建output/spill0.out文件
returnPath = createPath(path, checkWrite);
if (returnPath == null) { //如果创建失败(可能存在disk read-only的情况)
totalAvailable -= availableOnDisk[dir];
availableOnDisk[dir] = 0; // skip this disk
numDirsSearched++;
}
}
} else { //写入的数据量已知
while (numDirsSearched < numDirs && returnPath == null) {
long capacity = dirDF[dirNumLastAccessed].getAvailable();
if (capacity > size) {
returnPath = createPath(path, checkWrite);
}
//使用轮流的方式来选择${mapred.local.dir}
dirNumLastAccessed++;
dirNumLastAccessed = dirNumLastAccessed % numDirs;
numDirsSearched++;
}
}
if (returnPath != null) {
return returnPath;
} //no path found
throw new DiskErrorException("Could not find any valid local " +
"directory for " + pathStr);
}

  confChanged(conf)方法首先检查原来的目录配置是否改变,这个下面说;然后给numDirs赋值,它表示总的${mapred.local.dir}目录个数,localDirsPath数组变量在confChanged(conf)方法中被更新了;接着在准备创建output/spill0.out文件,这个文件就是Map Task的运算结果在缓冲区写满之后spill到disk生成的文件,序号0代表序号,最后会将多个spill文件合成一个file.out文件;接下来就要选择${mapred.local.dir}目录了。其过程如下:

  1、如果要写入的数据量大小未知时:

  a) 计算dirDF数组中每个元素的剩余大小,并计算所有元素的总大小totalAvailable;

  b) (循环)生成一个Long类型随机正数,该随机数对总大小totalAvailable取余后得randomPosition。

       (内层循环)若randomPosition > 某个disk剩余量,则randomPosition减去该disk剩余量,并与下一个disk剩余量比较……

  c) 选择了某个disk之后,如果这个disk不能创建文件,则排除这个disk,重新选择disk(总共尝试localDirsPath.length次)

  2、要写入的数据量大小已知时:将${mapred.local.dir}组织成一个数组,轮流的使用数组中的目录。dirNumLastAccessed表示上次访问过的目录;

  下面反过来分析下confChanged()方法。

  实际上该方法中的获取到的localDirs数组所代表的目录,是Map Task或Reduce Task的工作目录(即attempt_jobid_taskid_m_attemptid*)。因为每次不同的Task会使用不同的工作目录。所以每次不同的Task来read/write数据时,该方法都会为他们构造工作目录。具体代码如下:

 /** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately.
*/
private synchronized void confChanged(Configuration conf
) throws IOException {
//contextCfgItemName="mapred.local.dir"
String newLocalDirs = conf.get(contextCfgItemName);
if (!newLocalDirs.equals(savedLocalDirs)) { //savedLocalDirs代表上个task的工作目录
String[] localDirs = conf.getStrings(contextCfgItemName);
localFS = FileSystem.getLocal(conf);
int numDirs = localDirs.length; //${mapred.local.dir}目录的个数
ArrayList<String> dirs = new ArrayList<String>(numDirs);
ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
for (int i = 0; i < numDirs; i++) {
try {
// filter problematic directories
Path tmpDir = new Path(localDirs[i]);
//检查task的工作目录(attempt....)是否存在,如果不存在,则新建
if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
try {
DiskChecker.checkDir(new File(localDirs[i]));
dirs.add(localDirs[i]);
dfList.add(new DF(new File(localDirs[i]), 30000));
} catch (DiskErrorException de) {
LOG.warn( localDirs[i] + "is not writable\n" +
StringUtils.stringifyException(de));
}
} else {
LOG.warn( "Failed to create " + localDirs[i]);
}
} catch (IOException ie) {
LOG.warn( "Failed to create " + localDirs[i] + ": " +
ie.getMessage() + "\n" + StringUtils.stringifyException(ie));
} //ignore
}
localDirsPath = new Path[dirs.size()];
for(int i=0;i<localDirsPath.length;i++) {
localDirsPath[i] = new Path(dirs.get(i));
}
dirDF = dfList.toArray(new DF[dirs.size()]);
savedLocalDirs = newLocalDirs; //保存此次的task工作目录 // randomize the first disk picked in the round-robin selection
//因为该task所有的工作目录都遍历过了,所以随机选择一个目录作为最后访问过的目录
dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
}
}

  上面代码中的localDirsPath变量的内容如下所示:

/mapred/local/dir1/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0

/mapred/local/dir2/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0

/mapred/local/dir3/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0

  可以看到,这些路径中就只有${mapred.local.dir}不同,其下的目录结构都完全一样。

  说一下Task的工作目录。TaskTracker会在${mapred.local.dir}下生成相同的目录结构用来存放Map Task处理的结果数据,然后在Job完成时清理掉这些数据和目录。

  Task的工作目录就是指:${mapred.local.dir}/taskTracker/${user}/jobcache/jobID/taskID目录。在这个目录下的output文件夹中就存放着Map Task的结果,并以上述方式使用这些目录。

  才开始时,output目录下只有spill0.out文件(0代表序号),之后可能会产生多个spill文件。当Map Task执行完毕后会把所有属于该Task(即同一个taskid目录下)的spill文件合并成file.out文件。

  

  变量dirDF代表了一个DF数组,DF类代表了disk的使用情况(使用"df -k"命令得到),包含的属性如下:

 /**
* Filesystem disk space usage statistics. Uses the unix 'df' program to get
* mount points, and java.io.File for space utilization. Tested on Linux,
* FreeBSD, Cygwin.
*/
public class DF extends Shell { private final String dirPath;
private final File dirFile;
private String filesystem;
private String mount;

  分析完写数据的部分后,读数据的部分就很简单了。使用getLocalPathToRead()方法,从整个${mapred.local.dir}/taskTracker/${user}/jobcache/jobID/taskID中寻找所需要的文件,找到后返回其路径信息即可。

  ${mapred.local.dir}的选择策略也有以下问题:

1、disk是只读的

2、Disk没有足够空间了(多个线程共享disk)

  本文基于hadoop1.2.1

  如有错误,还请指正

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4124980.html

随机推荐

  1. 使用后缀数组寻找最长公共子字符串JavaScript版

    后缀数组很久很久以前就出现了,具体的概念读者自行搜索,小菜仅略知一二,不便讨论. 本文通过寻找两个字符串的最长公共子字符串,演示了后缀数组的经典应用. 首先需要说明,小菜实现的这个后缀数组算法,并非标 ...

  2. centos克隆,网卡启动失败

    情形:提示 Device eth0 does not seem to be present 步骤: 1.vmware 克隆,选择full clone 2.启动克隆后的系统,修改ifcfg-eth0,/ ...

  3. 深入理解计算机系统-从书中看到了异或交换ab两个值的新感

    还得从一个很经典的面试题说起:不通过第三个变量来交换两个变量a,b的值... 一个很经典的答案是通过异或来解决: 第壹步:a=a^b; 第贰步:b=a^b; 第叁步:a=a^b; 以前提起" ...

  4. gcc 安装

    最近在中标麒麟上面工作,结果发现上面gcc都没有,没有办法只好自己装(PS 中标麒麟:怪我咯) 资源:http://download.csdn.net/detail/jiahuat/8715413 按 ...

  5. 泛型类、Map集合

    ————泛型: JDK1.5之后出现的新特性:用于解决安全问题,是一个类型安全机制. 好处: 1.将运行时期出现的问题ClassCastException ,转移到了编译时期,方便于程序员解决问题,让 ...

  6. 局域网里连接mysql服务器,其他人连接自己的mysql服务器

    应用场景:  自己在自己的机器上开发网站,同事也要和我一起开发,就两个人,我自己的机器当做服务器,让他直接连我的数据库,看我的项目就行了,并且用svn进行开发,相当不错 问题: 怎样在局域网里,其他人 ...

  7. PCL-安装

    1.安装定期更新维护的PCL开发包. 通过PPA支持的Ubuntu系统,安装命令为: sudo add-apt-repository ppa:v-launched-jochen-sprickerhof ...

  8. MFC 的资源文件 就是那个后缀名是 &period;rc的那个

    参考: http://blog.csdn.net/zgrjkflmkyc/article/details/16897881 http://www.oschina.net/question/565065 ...

  9. apache rewrite 规则

    啥是虚拟主机呢?就是说把你自己的本地的开发的机子变成一个虚拟域名,比如:你在开发pptv下面的一个项目 127.0.0.1/pptv_trunk,你想把自己的机器域名变成www.pptv.com.那么 ...

  10. HDU 4497 GCD and LCM (分解质因数)

    链接 :  http://acm.hdu.edu.cn/showproblem.php?pid=4497 假设G不是L的约数 就不可能找到三个数. L的全部素因子一定包括G的全部素因子 而且次方数 ...

上一篇:IBM Python 技术专题


下一篇:C#代码中如何比较两个日期的大小?