标签:
在HDFS中,数据的存储是以Block块的形式进行组织的.而每个块的默认副本数是3个,所以一般每个在HDFS中会存在3个相同的block块分布在不同的DataNode节点之上.所以在每个DataNode上,会存储着大量的block,那么这些块是如何被组织,联系起来的的呢,HDFS在添加块,移除块时是如何操作这些block块以及对应的关联信息呢,链表?数组?HashMap?答案就在BlockInfoContiguous这个类中.
这个类不是在所有的Hadoop版本中都有,在最新的hadoop-trunk代码中这个类已经不怎么使用了,所以这里我要说明一下我学习使用的版本是hadoop-2.7.1.在此版本中,BlockInfoContiguous就是用来联系寻找block块的直接信息类.在官方的源码中对BlockInfoContiguous的注释为:
/** * BlockInfo class maintains for a given block * the {@link INodeFile} it is part of and datanodes where the replicas of * the block are stored. * BlockInfo class maintains for a given block * the {@link BlockCollection} it is part of and datanodes where the replicas of * the block are stored. */ @InterfaceAudience.Private public class BlockInfoContiguous extends Block implements LightWeightGSet.LinkedElement {
在BlockInfoContiguous类中,有2个内部关键的对象信息BlockCollection和triplets.前者保存了类似副本数,副本位置等的一些信息,而triplets对象数组的设计则是本文的一个重点.所以下面要独立出篇幅来详细的分析triplets的设计结构和思想.
triplets对象起始初始化是若干长度的Object对象,但是在赋值的时候,会存储2类的对象.此对象的源码注释如下:
/** * This array contains triplets of references. For each i-th storage, the * block belongs to triplets[3*i] is the reference to the * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are * references to the previous and the next blocks, respectively, in the list * of blocks belonging to this storage. * * Using previous and next in Object triplets is done instead of a * {@link LinkedList} list to efficiently use memory. With LinkedList the cost * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 * bytes using the triplets. */ private Object[] triplets;上述的注释解释可主要解释为下面几点:
1.对于当前block块的信息,block存在于哪些data-storage中,假如存储于i个节点,则triplets对象数组大小就是3 * i个,一般存储的节点数视副本系数而定.
2.对triplets每3个为一单位的数组来说,triplets[3 * i]保存的是data-storage信息,triplets[3 * i + 1]保存的是此data-storage中previous前一个block对象的信息,triplets[3 * i + 2]保存的则是后一块的block的信息,而保存block信息对象的类同样是BlockInfoContiguous.
所以你可以稍稍的想象一下,这其实是一个"巨大的链表".但是他为了更高效的使用内存没有用jdk自带的LinkList这样的链表结构.介绍triplets的结构重新再来看看BlockInfoContiguous的结构组成,下面是一张结构图:
DatanodeStorageInfo1,2,3是当前block存储的节点,所以triplets的长度根据副本数进行初始化:
/** * Construct an entry for blocksmap * @param replication the block‘s replication factor */ public BlockInfoContiguous(short replication) { this.triplets = new Object[3*replication]; this.bc = null; }
每个data-storage上会存储大量的block块,于是通过块的next块或previous块,可以遍历完整个节点上的所有块.所有在每个DataNodeStorageInfo中,所持有的block块的结构可以用下图进行展示:
这里的head头block块,对应的是DataNodeStorage中的blacklist对象:
private volatile BlockInfoContiguous blockList = null;
上面的同一个节点中的block块与block块之间的关系放大了的表示如下图所示:
data-node上的关于block块的操作都会在他所维护的block列表中进行操作.
data-node上的block块的添加删除动作对照过来就是BlockInfoContiguous的链表操作.其中的操作主要分为2类,addBlock块的添加,还有一个就是removeBlock操作.这2个方法都是定义在DataNodeStorageInfo中,最终映射到的block的链表操作方法是listInsert和listRemove,下面主要详细分析一下这2个方法:
listInsert的操作效果是往对应节点链表中添加一个block块,触发此操作的原始方法是DataNodeStorage的addBlock方法,如下:
public AddBlockResult addBlock(BlockInfoContiguous b) { // First check whether the block belongs to a different storage // on the same DN. AddBlockResult result = AddBlockResult.ADDED; DatanodeStorageInfo otherStorage = b.findStorageInfo(getDatanodeDescriptor()); if (otherStorage != null) { if (otherStorage != this) { // The block belongs to a different storage. Remove it first. otherStorage.removeBlock(b); result = AddBlockResult.REPLACED; } else { // The block is already associated with this storage. return AddBlockResult.ALREADY_EXIST; } }在这个方法中,主要关注末尾的2个方法,b.addStorage和b.listInsert. b.addStorage的意思是在新增的block块中赋值当前的节点信息,因为此block块被写入到当前节点中,要把节点信息写入block自身维护的链表信息中.// add to the head of the data-node list b.addStorage(this);blockList = b.listInsert(blockList, this); numBlocks++; return result; }
/** * Add a {@link DatanodeStorageInfo} location for a block */ boolean addStorage(DatanodeStorageInfo storage) { // find the last null node //triplets数组扩容1个单位的data-storage,相当于扩充3个数组 int lastNode = ensureCapacity(1); //设置datanode信息对象到triplets[3 * lastNode]中 setStorageInfo(lastNode, storage); //设置下一block块为null到triplets[3 * lastNode + 2] setNext(lastNode, null); //设置前一block块为null到triplets[3 * lastNode + 1] setPrevious(lastNode, null); return true; }
private void setStorageInfo(int index, DatanodeStorageInfo storage) { assert this.triplets != null : "BlockInfo is not initialized"; assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; triplets[index*3] = storage; } /** * Return the previous block on the block list for the datanode at * position index. Set the previous block on the list to "to". * * @param index - the datanode index * @param to - block to be set to previous on the list of blocks * @return current previous block on the list of blocks */ private BlockInfoContiguous setPrevious(int index, BlockInfoContiguous to) { assert this.triplets != null : "BlockInfo is not initialized"; assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1]; triplets[index*3+1] = to; return info; }另外一个操作就是把此块的信息加入到当前维护的链表中,将head头节点blocklist以参数的形式传入,然后将返回值重新赋值给头节点,相当于是进行了1次头节点的更新.
blockList = b.listInsert(blockList, this);
/** * Insert this block into the head of the list of blocks * related to the specified DatanodeStorageInfo. * If the head is null then form a new list. * @return current block as the new head of the list. */ BlockInfoContiguous listInsert(BlockInfoContiguous head, DatanodeStorageInfo storage) { //在当前block中寻找对应data-storage的下标 int dnIndex = this.findStorageInfo(storage); assert dnIndex >= 0 : "Data node is not found: current"; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : "Block is already in the list and cannot be inserted."; this.setPrevious(dnIndex, null); //将当前的下一节点指向head头节点 this.setNext(dnIndex, head); if(head != null) //将头节点的前一节点指向当前节点 head.setPrevious(head.findStorageInfo(storage), this); //返回当前节点为新的头节点 return this; }block在之前的addStorage中设置的null会在此操作中连向head头节点.用图形展示的效果如下:
另外一个对应的操作就是data-storage节点的removeBlock动作.在节点上执行了删除block动作之后,会触发这个链表操作.
public boolean removeBlock(BlockInfoContiguous b) { blockList = b.listRemove(blockList, this); if (b.removeStorage(this)) { numBlocks--; return true; } else { return false; } }同样会有2个步骤,从链表中移除掉目标块,第二个从目标块中自身中释放掉对于节点的信息.首先来看listRemove将当前目标block块清楚,
/** * Remove this block from the list of blocks * related to the specified DatanodeStorageInfo. * If this block is the head of the list then return the next block as * the new head. * @return the new head of the list or null if the list becomes * empy after deletion. */ BlockInfoContiguous listRemove(BlockInfoContiguous head, DatanodeStorageInfo storage) { if(head == null) return null; int dnIndex = this.findStorageInfo(storage); if(dnIndex < 0) // this block is not on the data-node list return head; //将对应的当前节点信息置为空 BlockInfoContiguous next = this.getNext(dnIndex); BlockInfoContiguous prev = this.getPrevious(dnIndex); this.setNext(dnIndex, null); this.setPrevious(dnIndex, null); //将前后节点联系关联 if(prev != null) prev.setNext(prev.findStorageInfo(storage), next); if(next != null) next.setPrevious(next.findStorageInfo(storage), prev); if(this == head) // removing the head head = next; return head; }用图形展示的效果如下图所示:
removeBlock之前:
removeBlock之后:
还有一个操作是将目标block块中的相关data-storage的信息设置为null.
/** * Remove {@link DatanodeStorageInfo} location for a block */ boolean removeStorage(DatanodeStorageInfo storage) { int dnIndex = findStorageInfo(storage); if(dnIndex < 0) // the node is not found return false; assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : "Block is still in the list and must be removed first."; // find the last not null node int lastNode = numNodes()-1; // replace current node triplet by the lastNode one setStorageInfo(dnIndex, getStorageInfo(lastNode)); setNext(dnIndex, getNext(lastNode)); setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null setStorageInfo(lastNode, null); setNext(lastNode, null); setPrevious(lastNode, null); return true; }这里的动作是将lastNode最后一个节点的位置替换到当前要删除的位置,并将原最后节点的置为空.这是为了方便后面的ensureCapacity动态扩充triplets数组的大小,无需重新创建对象数组.
moveBlockToHead操作也是BlockInfoContiguous经常会被调用的方法,而且这个方法在之前的一篇文章中NameNode处理上报block块逻辑分析有被提到过.在reportDiff方法中被调用到了.
private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection<BlockInfoContiguous> toAdd, // add to DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor Collection<Block> toInvalidate, // should be removed from DN Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1); AddBlockResult result = storageInfo.addBlock(delimiter); assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list int curIndex; //... // scan the report and process newly reported blocks for (BlockReportReplica iblk : newReport) { ... // move block to the head of the list if (storedBlock != null && (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); } } ...原理通过将块移动到标记block块的一侧,最后区分哪些block块在本轮有无被汇报过,moveBlockToHead的作用就是将块直接移到链表头部.
/** * Remove this block from the list of blocks related to the specified * DatanodeDescriptor. Insert it into the head of the list of blocks. * * @return the new head of the list. */ public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head, DatanodeStorageInfo storage, int curIndex, int headIndex) { if (head == this) { return this; } //将当前block的下一节点指向头节点 BlockInfoContiguous next = this.setNext(curIndex, head); //置空前一节点 BlockInfoContiguous prev = this.setPrevious(curIndex, null); //设置头节点的前一节点为空 head.setPrevious(headIndex, this); //将当前节点原来的前后节点相连 prev.setNext(prev.findStorageInfo(storage), next); if (next != null) { next.setPrevious(next.findStorageInfo(storage), prev); } return this; }用图形展示的效果如下:
在BlockInfoContiguous类中,其实还有一些其他的辅助方法,这里主要分析其中的3种也是经常被调用的3种方法,下图是其中主要的方法分类,同种颜色表明是同类型的操作
对于一个节点上来说,我们想要遍历其上的block,就需要一个迭代器,能够通过next()类似的方法获取其中的block块,在jdk自带的链表中是有直接获取的方法的,但是对于HDFS中如此设计的链表,HDFS的内部也同样设计了对应的迭代器.
private static class BlockIterator implements Iterator<BlockInfoContiguous> { private int index = 0; private final List<Iterator<BlockInfoContiguous>> iterators; private BlockIterator(final DatanodeStorageInfo... storages) { List<Iterator<BlockInfoContiguous>> iterators = new ArrayList<Iterator<BlockInfoContiguous>>(); for (DatanodeStorageInfo e : storages) { iterators.add(e.getBlockIterator()); } this.iterators = Collections.unmodifiableList(iterators); } @Override public boolean hasNext() { update(); return !iterators.isEmpty() && iterators.get(index).hasNext(); } @Override public BlockInfoContiguous next() { update(); return iterators.get(index).next(); } @Override public void remove() { throw new UnsupportedOperationException("Remove unsupported."); } private void update() { while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) { index++; } } }storages节点信息是以参数的形式传入的.
DatanodeStorageInfo[] getStorageInfos() { synchronized (storageMap) { final Collection<DatanodeStorageInfo> storages = storageMap.values(); return storages.toArray(new DatanodeStorageInfo[storages.size()]); } }在具体的迭代器内部设计,如下:
/** * Iterates over the list of blocks belonging to the data-node. */ class BlockIterator implements Iterator<BlockInfoContiguous> { private BlockInfoContiguous current; BlockIterator(BlockInfoContiguous head) { this.current = head; } public boolean hasNext() { return current != null; } public BlockInfoContiguous next() { BlockInfoContiguous res = current; current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this)); return res; } public void remove() { throw new UnsupportedOperationException("Sorry. can‘t remove."); } }
在DecommisionManager的processForDecomInternal中就用到了这个迭代器:
/** * Returns a list of blocks on a datanode that are insufficiently * replicated, i.e. are under-replicated enough to prevent decommission. * <p/> * As part of this, it also schedules replication work for * any under-replicated blocks. * * @param datanode * @return List of insufficiently replicated blocks */ private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated( final DatanodeDescriptor datanode) { AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>(); processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), insufficient, false); return insufficient; }
以上就是HDFS中关系着大量block块的链表,也帮大家复习复习了数据结构中的链表操作了.但是这里需要提醒一点,一旦集群中的block块数达到千万级别,BlokcInfoContiguous同样会消耗掉大量的存储空间,也就是说会有同时会有千万个INodeFile和BlockInfoContiguous对象.
标签:
原文地址:http://blog.csdn.net/androidlushangderen/article/details/50652049