标签:des blog os io java ar for 2014 art
NM端发送心跳
//NM发送心跳,增加一个NODE_UPDATE事件,简单返回一个respone,异步驱动型,事件再驱动assignContainers,从资源请求结构里取出需求分配资源
//AsyncDispatcher原理
//一个event队列,一个eventtype.class 到处理器对应关系(仅仅是一个class对应一个处理器,class是个Enum可能会有很多种值,具体逻辑在处理器内部)
//从队列取出event,再从event取到type类型,再找到处理器,处理器调用handler(event)方法
//nodeHeartBeat增加一个RMStatusEvent事件(事件类型是RMNodeType.Status_UPDATE)
RM register到他对应的处理器
该处理器 最终调用RMNodeImpl
RMNodeImpl会增加SchedulerEvent
//
NodeManager类会调以下这个类
NodeStatusUpdaterImpl类
protected void startStatusUpdater() {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
....
response = resourceTracker.nodeHeartbeat(request); //发送心跳到ResourceTrackerService
..
会rpc远程调用 ResourceTrackerService类里
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
NodeStatus remoteNodeStatus = request.getNodeStatus();
/**
* Here is the node heartbeat sequence...
* 1. Check if it's a registered node
* 2. Check if it's a valid (i.e. not excluded) node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
*/
....
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), //RMNodeStatusEvent是RMNodeEvent的子类,构造器指定RMNodeEventType.STATUS_UPDATE 类型 事件
//在RM会通过register给asyncDispatcher指定类型对应的处理器,可查看后面代码,对应到NodeEventDispatcher处理器,该类内部会用RMNodeImpl,该类又会引起
//scheduler相关事件
remoteNodeStatus.getContainersStatuses(), // 包含各个container状态,是一个list
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); //新建个事件,把他放入AsyncDispatcher里的队列,最后应该会激起ResourceScheduler来处理
rmContext是在ResourceManager里构建,这里重点知道Dispatcher用的是哪个
..
this.rmContext =
new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
this.containerTokenSecretManager, this.nmTokenSecretManager,
this.clientToAMSecretManager);
。。
protected Dispatcher createDispatcher() {
return new AsyncDispatcher(); //rmDispatcher 通过该方法构建,org.apache.hadoop.yarn.event.AsyncDispatcher
//有个事件队列,和事件类型到事件处理器的map关系,异步线程根据event内部取出事件类型(包含事件是哪种事件类型是在其内部设置的)
//,再找到哪个处理器,具体处理器内部处理逻辑根据不同类型enum特定值区分
//类型.class与处理器对应关系,通过register
}
//同时RM里register注册各个,事件类型对应的事件处理器, 在AsyncDispatcher内的异步线程里再根据这个map对应关系知道用哪个事件处理器
this.rmDispatcher.register(SchedulerEventType.class, //enum类也有NODE-UPDATE的值
this.schedulerDispatcher);
rmDispatcher
// Register event handler for RmAppEvents
this.rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(this.rmContext));
// Register event handler for RmAppAttemptEvents
this.rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(this.rmContext));
// Register event handler for RmNodes
this.rmDispatcher.register(RMNodeEventType.class, //枚举值,有NODE-UPDATE,NodeEventDispatcher里的处理逻辑会根据RMNodeEventType里的值做
//分别的处理,类似case when ....
new NodeEventDispatcher(this.rmContext)); //注册事件处理器
NodeEventDispatcher类,在RM内部
public void handle(RMNodeEvent event) { //事件处理方法
NodeId nodeId = event.getNodeId();
RMNode node = this.rmContext.getRMNodes().get(nodeId);
if (node != null) {
try {
((EventHandler<RMNodeEvent>) node).handle(event) ; //通过RMNode强制转换成处理器,对RMNodeImpl同时也继承EventHandler,其内部
//会调用scheduler相关
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for node " + nodeId, t);
}
//总结
NodeManager发送心跳到RM端的ResourceManagerService,调用nodeHeartbeat方法,发送STATUS_UPDATE 类型的事件给到RMNode,RMNodeImpl类内
.addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
StatusUpdateWhenHealthyTransition类 transition方法
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器
//下面分析调度
接上面分析
StatusUpdateWhenHealthyTransition类
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器
会增加个scheduler的事件
在RM构造方法内已经注册了对应类型的处理事件,如下:
// Initialize the scheduler
this.scheduler = createScheduler();
this.schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(this.schedulerDispatcher);
this.rmDispatcher.register(SchedulerEventType.class,
this.schedulerDispatcher); //事件处理器
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
}
SchedulerEventDispatcher内部又构建了个队列,将事件放入,异步处理,最后调用scheduler来处理该事件
public void run() {
SchedulerEvent event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
scheduler.handle(event); //该方法调用调度器
...
public void handle(SchedulerEvent event) {
try {
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of scheduler event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on scheduler event queue: "
+ remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
//FIFOScheduler
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
{
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
....
nodeUpdate方法
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
....
assignContainers(node);
//核心方法,分配containers
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
.entrySet()) {
FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
// Check if this resource is on the blacklist
if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
continue;
}
for (Priority priority : application.getPriorities()) {
int maxContainers =
getMaxAllocatableContainers(application, priority, node,
NodeType.OFF_SWITCH);
// Ensure the application needs containers of this priority
if (maxContainers > 0) {
int assignedContainers =
assignContainersOnNode(node, application, priority); //分配方法
// Do not assign out of order w.r.t priorities
if (assignedContainers == 0) {
break;
}
}
}
}
LOG.debug("post-assignContainers");
application.showRequests();
// Done
if (Resources.lessThan(resourceCalculator, clusterResource,
node.getAvailableResource(), minimumAllocation)) {
break;
}
}
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
for (FiCaSchedulerApp application : applications.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
assignContainersOnNode
private int assignContainersOnNode(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority
) {
// Data-local
int nodeLocalContainers =
assignNodeLocalContainers(node, application, priority);
// Rack-local
int rackLocalContainers =
assignRackLocalContainers(node, application, priority);
.....
assignNodeLocalContainers
..
int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, priority, node,
NodeType.NODE_LOCAL),
request.getNumContainers());
assignedContainers =
assignContainer(node, application, priority,
assignableContainers, request, NodeType.NODE_LOCAL);
//总结:NM发送心跳到RM,发送NODE_UPDATE事件,激发相关事件,最终到RMNode RMNodeImpl,将事件加入RMNodeImpl ,RMNodeImpl是一个状态机
//addTransition内可以看到会调用到StatusUpdateWhenHealthyTransition,StatusUpdateWhenHealthyTransition类 transition方法会将NodeUpdateSchedulerEvent
//事件加入到异步处理器, 最终会调用scheduler的assignContainers方法,该方法从application里资源请求的内存结构里取资源请求,进行分配
//并将结果保存在application的分配内存结构等待appmaster来取
//appmaster来取的时候,首先更新资源请求内存结构,再取分配内存结构标签:des blog os io java ar for 2014 art
原文地址:http://blog.csdn.net/u011750989/article/details/39152255