码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop 1.x的Shuffle源码分析之2

时间:2015-05-13 08:50:45      阅读:131      评论:0      收藏:0      [点我收藏+]

标签:

ReduceTask类的内嵌类ReduceCopier的内嵌类MapOutputCopier的函数copyOutput是Shuffle里最重要的一环,它以http的方式,从远程主机取数据:创建临时文件名,然后用http读数据,再保存到内存文件系统或者本地文件系统。它读取远程文件的函数是getMapOutput。


getMapOutput函数如下:

private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
                                     Path filename, int reduce)
      throws IOException, InterruptedException {
        //建立http链接
        URL url = mapOutputLoc.getOutputLocation();
        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        //创建输入流
        InputStream input = setupSecureConnection(mapOutputLoc, connection);

        //检查连接姿势是否正确
        int rc = connection.getResponseCode();
        if (rc != HttpURLConnection.HTTP_OK) {
          throw new IOException(
              "Got invalid response code " + rc + " from " + url +
              ": " + connection.getResponseMessage());
        }
 
        //从http链接获取mapId
        TaskAttemptID mapId = null;
        try {
          mapId =
            TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
        } catch (IllegalArgumentException ia) {
          LOG.warn("Invalid map id ", ia);
          return null;
        }
        //检查mapId是否一致
        TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
        if (!mapId.equals(expectedMapId)) {
          LOG.warn("data from wrong map:" + mapId +
              " arrived to reduce task " + reduce +
              ", where as expected map output should be from " + expectedMapId);
          return null;
        }
        //如果数据有压缩,要获取压缩长度
        long decompressedLength = 
          Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
        long compressedLength = 
          Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));

        if (compressedLength < 0 || decompressedLength < 0) {
          LOG.warn(getName() + " invalid lengths in map output header: id: " +
              mapId + " compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
          return null;
        }
        int forReduce =
          (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
        
        if (forReduce != reduce) {
          LOG.warn("data for the wrong reduce: " + forReduce +
              " with compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength +
              " arrived to reduce task " + reduce);
          return null;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("header: " + mapId + ", compressed len: " + compressedLength +
              ", decompressed len: " + decompressedLength);
        }

        //We will put a file in memory if it meets certain criteria:
        //1. The size of the (decompressed) file should be less than 25% of 
        //    the total inmem fs
        //2. There is space available in the inmem fs
        
        // Check if this map-output can be saved in-memory
        boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 

        // Shuffle
        MapOutput mapOutput = null;
        if (shuffleInMemory) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into RAM from " + mapOutputLoc.getTaskAttemptId());
          }
          //在内存做shuffle处理
          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                      (int)decompressedLength,
                                      (int)compressedLength);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
          }
          //在本地做shuffle处理
          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength);
        }
        mapOutput.decompressedSize = decompressedLength;    
        return mapOutput;
      }











Hadoop 1.x的Shuffle源码分析之2

标签:

原文地址:http://blog.csdn.net/lizhe_dashuju/article/details/45687827

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!