标签:
引言
// mapping from a rack name to the list of blocks it has HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>(); // mapping from a block to the nodes on which it has replicas HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>(); // mapping from a node to the list of blocks that it contains HashMap<String, List<OneBlockInfo>> nodeToBlocks = new HashMap<String, List<OneBlockInfo>>();
开始形成切片之前,需要初始化三个重要的映射关系:
// populate all the blocks for all files long totLength = 0; for (int i = 0; i < paths.length; i++) { files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes); totLength += files[i].getLength(); }
// 保存当前切片所包含的数据块 ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); // 保存当前切片中的数据块属于哪些节点 ArrayList<String> nodes = new ArrayList<String>(); // 保存当前切片的大小 long curSplitSize = 0; // process all nodes and create splits that are local to a node. // 依次处理每个节点上的数据块 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) { Map.Entry<String, List<OneBlockInfo>> one = iter.next(); nodes.add(one.getKey()); List<OneBlockInfo> blocksInNode = one.getValue(); // for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in // two different splits. // 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中 for (OneBlockInfo oneblock : blocksInNode) { if (blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; // if the accumulated split size exceeds the maximum, then create this split. // 如果数据块累积大小大于或等于maxSize,则形成一个切片 if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array addCreatedSplit(job, splits, nodes, validBlocks); curSplitSize = 0; validBlocks.clear(); } } } // if there were any blocks left over and their combined size is // larger than minSplitNode, then combine them into one split. // Otherwise add them back to the unprocessed pool. It is likely // that they will be combined with other blocks from the same rack later on. // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片; // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理 if (minSizeNode != 0 && curSplitSize >= minSizeNode) { // create an input split and add it to the splits array addCreatedSplit(job, splits, nodes, validBlocks); } else { for (OneBlockInfo oneblock : validBlocks) { blockToNodes.put(oneblock, oneblock.hosts); } } validBlocks.clear(); nodes.clear(); curSplitSize = 0; }
// if blocks in a rack are below the specified minimum size, then keep them // in ‘overflow‘. After the processing of all racks is complete, these overflow // blocks will be combined into splits. // overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块 ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); ArrayList<String> racks = new ArrayList<String>(); // Process all racks over and over again until there is no more work to do. while (blockToNodes.size() > 0) { // Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the // remaining racks. // Process one rack location at a time, Combine all possible blocks that // reside on this rack as one split. (constrained by minimum and maximum // split size). // iterate over all racks // 依次处理每个机架 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks.entrySet().iterator(); iter.hasNext();) { Map.Entry<String, List<OneBlockInfo>> one = iter.next(); racks.add(one.getKey()); List<OneBlockInfo> blocks = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. boolean createdSplit = false; // 依次处理该机架的每个数据块 for (OneBlockInfo oneblock : blocks) { if (blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; // if the accumulated split size exceeds the maximum, then create this split. // 如果数据块累积大小大于或等于maxSize,则形成一个切片 if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); createdSplit = true; break; } } } // if we created a split, then just go to the next rack if (createdSplit) { curSplitSize = 0; validBlocks.clear(); racks.clear(); continue; } if (!validBlocks.isEmpty()) { // 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片 if (minSizeRack != 0 && curSplitSize >= minSizeRack) { // if there is a mimimum size specified, then create a single split // otherwise, store these blocks into overflow data structure addCreatedSplit(job, splits, getHosts(racks), validBlocks); } else { // There were a few blocks in this rack that remained to be processed. // Keep them in ‘overflow‘ block list. These will be combined later. // 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks overflowBlocks.addAll(validBlocks); } } curSplitSize = 0; validBlocks.clear(); racks.clear(); } }
// Process all overflow blocks for (OneBlockInfo oneblock : overflowBlocks) { validBlocks.add(oneblock); curSplitSize += oneblock.length; // This might cause an exiting rack location to be re-added, // but it should be ok. for (int i = 0; i < oneblock.racks.length; i++) { racks.add(oneblock.racks[i]); } // if the accumulated split size exceeds the maximum, then // create this split. // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片 if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0; validBlocks.clear(); racks.clear(); } }
// Process any remaining blocks, if any. if (!validBlocks.isEmpty()) { addCreatedSplit(job, splits, getHosts(racks), validBlocks); }
Hadoop CombineFileInputFormat原理及源码分析
标签:
原文地址:http://www.cnblogs.com/yurunmiao/p/4282497.html