标签:cti dfs iss normal sch 优先 ext hose linkedset
/**
* Parse the data-nodes the block belongs to and choose one,
* which will be the replication source.
*
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
* Otherwise we choose a random node among those that did not reach their
* replication limits. However, if the replication is of the highest priority
* and all nodes have reached their replication limits, we will choose a
* random node despite the replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
*解析块所属的数据节点并选择一个,这将是复制源。
我们更偏向处于DECOMMISSION_INPROGRESS(退役中)状态的节点到其他节点,因为前者没有写入流量,因此不太忙。
我们不使用已经退役的节点作为源。除此之外,我们会在未达到复制限制的节点中选择一个随机节点。
但是,如果复制到达最高优先级并且所有节点都已达到其复制限制,我们将随机选择一个节点,尽管有复制限制。
另外,形成包含块的所有节点的列表并计算其复制数。
*
* @param block Block for which a replication source is needed
* @param containingNodes List to be populated with nodes found to contain the
* given block
* @param nodesContainingLiveReplicas List to be populated with nodes found to
* contain live replicas of the given block
* @param numReplicas NumberReplicas instance to be initialized with the
* counts of live, corrupt, excess, and
* decommissioned replicas of the given
* block.
* @param priority integer representing replication priority of the given
* block
* @return the DatanodeDescriptor of the chosen node from which to replicate
* the given block
*/
@VisibleForTesting
DatanodeDescriptor chooseSourceDatanode(Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica;
} else {
nodesContainingLiveReplicas.add(storage);
live += countableReplica;
}
containingNodes.add(node);
// Check if this replica is corrupt
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
{
continue; // already reached replication limit
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
{
continue;
}
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;
// never use already decommissioned nodes
if(node.isDecommissioned())
continue;
// we prefer nodes that are in DECOMMISSION_INPROGRESS state
if(node.isDecommissionInProgress() || srcNode == null) {
srcNode = node;
continue;
}
if(srcNode.isDecommissionInProgress())
continue;
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
if(DFSUtil.getRandom().nextBoolean())
srcNode = node;
}
if(numReplicas != null)
numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
return srcNode;
}
hdfs 如何实现退役节点快速下线(也就是退役节点上的数据块快速迁移)
标签:cti dfs iss normal sch 优先 ext hose linkedset
原文地址:https://www.cnblogs.com/jiangxiaoxian/p/9665588.html