标签:
shuffle有两种,一种是在内存存储数据,另一种是在本地文件存储数据,两者几乎一致。
以本地文件进行shuffle的过程为例:
mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength)
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, long mapOutputLength) throws IOException {
// Find out a suitable location for the output on local-filesystem
//在本地文件系统做输出,输出文件的path Path localFilename = lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), mapOutputLength, conf);
//创建Map输出 MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), conf, localFileSys.makeQualified(localFilename), mapOutputLength); // Copy data to local-disk
//从input读取数据,写入到本地文件,这个input是http连接创建的流式输入 OutputStream output = null; long bytesRead = 0; try { output = rfs.create(localFilename); byte[] buf = new byte[64 * 1024]; int n = -1; try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } while (n > 0) { bytesRead += n; shuffleClientMetrics.inputBytes(n); output.write(buf, 0, n); // indicate we're making progress reporter.progress(); try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } } LOG.info("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId());
//正常取完数据,关闭。 output.close(); input.close(); } catch (IOException ioe) { LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), ioe); // Discard the map-output
try { mapOutput.discard(); } catch (IOException ignored) { LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; // Close the streams IOUtils.cleanup(LOG, input, output); // Re-throw throw ioe; } // Sanity check
//检查读取是否正常 if (bytesRead != mapOutputLength) { try { mapOutput.discard(); } catch (Exception ioe) { // IGNORED because we are cleaning up LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ioe); } catch (Throwable t) { String msg = getTaskID() + " : Failed in shuffle to disk :" + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, msg); } mapOutput = null; throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")" ); } return mapOutput; }
java的代码很直接,没有花花绕的东东,除了略有一点冗长,实在没什么缺点 :)
标签:
原文地址:http://blog.csdn.net/lizhe_dashuju/article/details/45700135