码迷,mamicode.com
首页 > 其他好文 > 详细

${mapred.local.dir}选择策略--Map Task存放中间结果

时间:2014-11-26 22:37:03      阅读:293      评论:0      收藏:0      [点我收藏+]

标签:style   blog   http   io   ar   color   os   使用   sp   

  上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略。

1 mapred-site.xml
2 <property>
3   <name>mapred.local.dir</name>
4   <value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local</value>
5 </property>

  当${mapred.local.dir}配置有多个目录分别用来挂载不同的硬盘时,Map Task的结果应该存放在哪个目录中?首先还是看一下方法的调用层次,如下图所示:

bubuko.com,布布扣

  下面分析这两个方法:

 1    /** Get a path from the local FS. If size is known, we go
 2      *  round-robin over the set of disks (via the configured dirs) and return
 3      *  the first complete path which has enough space.
 4      *  
 5      *  If size is not known, use roulette selection -- pick directories
 6      *  with probability proportional to their available space.
 7      */
 8     public synchronized 
 9     Path getLocalPathForWrite(String pathStr, long size, 
10                               Configuration conf, boolean checkWrite
11                               ) throws IOException {
12         //检查task目录是否有变化
13       confChanged(conf);
14       int numDirs = localDirsPath.length;    //获取${mapred.local.dir}目录的个数
15       int numDirsSearched = 0;    //表示已经搜索过的次数
16       //remove the leading slash from the path (to make sure that the uri
17       //resolution results in a valid path on the dir being checked)
18       if (pathStr.startsWith("/")) {    //是指output/spill0.out文件
19         pathStr = pathStr.substring(1);
20       }
21       Path returnPath = null;
22       Path path = new Path(pathStr);
23       
24       //当要写入的数据量大小未知时
25       if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability 
26                     //proportional to available size
27         long[] availableOnDisk = new long[dirDF.length];
28         long totalAvailable = 0;
29         
30             //build the "roulette wheel"
31         for(int i =0; i < dirDF.length; ++i) { 
32             //分别计算每一个${mapred.local.dir}目录可用大小,并计算总的可用大小
33           availableOnDisk[i] = dirDF[i].getAvailable();
34           totalAvailable += availableOnDisk[i];
35         }
36 
37         // Keep rolling the wheel till we get a valid path
38         Random r = new java.util.Random();
39         while (numDirsSearched < numDirs && returnPath == null) {
40           long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
41           int dir = 0;
42           while (randomPosition > availableOnDisk[dir]) {
43             randomPosition -= availableOnDisk[dir];
44             dir++;
45           }
46           dirNumLastAccessed = dir;    //表示上次访问过的目录
47           //从${mapred.local.dir}中选择一个目录,在其下创建output/spill0.out文件
48           returnPath = createPath(path, checkWrite); 
49           if (returnPath == null) { //如果创建失败(可能存在disk read-only的情况)
50             totalAvailable -= availableOnDisk[dir];
51             availableOnDisk[dir] = 0; // skip this disk
52             numDirsSearched++;
53           }
54         }
55       } else {  //写入的数据量已知
56         while (numDirsSearched < numDirs && returnPath == null) {
57           long capacity = dirDF[dirNumLastAccessed].getAvailable();
58           if (capacity > size) {
59               returnPath = createPath(path, checkWrite);
60           }
61           //使用轮流的方式来选择${mapred.local.dir}
62           dirNumLastAccessed++;
63           dirNumLastAccessed = dirNumLastAccessed % numDirs; 
64           numDirsSearched++;
65         } 
66       }
67       if (returnPath != null) {
68         return returnPath;
69       }
70       
71       //no path found
72       throw new DiskErrorException("Could not find any valid local " +
73           "directory for " + pathStr);
74     }

  confChanged(conf)方法首先检查原来的目录配置是否改变,这个下面说;然后给numDirs赋值,它表示总的${mapred.local.dir}目录个数,localDirsPath数组变量在confChanged(conf)方法中被更新了;接着在准备创建output/spill0.out文件,这个文件就是Map Task的运算结果在缓冲区写满之后spill到disk生成的文件,序号0代表序号,最后会将多个spill文件合成一个file.out文件;接下来就要选择${mapred.local.dir}目录了。其过程如下:

  1、如果要写入的数据量大小未知时:

  a) 计算dirDF数组中每个元素的剩余大小,并计算所有元素的总大小totalAvailable;

  b) (循环)生成一个Long类型随机正数,该随机数对总大小totalAvailable取余后得randomPosition。

 

(内层循环)若randomPosition > 某个disk剩余量,则randomPosition减去该disk剩余量,并与下一个disk剩余量比较……

  c) 选择了某个disk之后,如果这个disk不能创建文件,则排除这个disk,重新选择disk(总共尝试localDirsPath.length次)

 

 

  2、要写入的数据量大小已知时:将${mapred.local.dir}组织成一个数组,轮流的使用数组中的目录。dirNumLastAccessed表示上次访问过的目录;

 

  下面反过来分析下confChanged()方法。

 

  实际上该方法中的获取到的localDirs数组所代表的目录,是Map Task或Reduce Task的工作目录。因为每次不同的

 

${mapred.local.dir}选择策略--Map Task存放中间结果

标签:style   blog   http   io   ar   color   os   使用   sp   

原文地址:http://www.cnblogs.com/gwgyk/p/4124980.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!