记录源码细节,内部有中文注释
Client 端:
//最终通过ApplicationClientProtocol协议提交到RM端的ClientRMService内 package org.apache.hadoop.mapred; jobclient包内 YarnRunner public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); //提交作业 ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); ResourceMgrDelegate类 public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { return client.submitApplication(appContext); } public ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); this.conf = conf; this.client = YarnClient.createYarnClient(); //该方法会创建YarnClientImpl,具体提交逻辑在该类里 init(conf); start(); } YarnClientImpl类 public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); appContext.setApplicationId(applicationId); SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); rmClient.submitApplication(request); //ApplicationClientProtocol rmClient
RM端:
//提交只是往中央异步处理器加入RMAppEventType.START事件,异步处理,之后不等待处理结果,直接返回个简单的respone ClientRMService内: public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); ..... } } try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), false, user); //作业提交,调用的是RMAppManager中方法 LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); } catch (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(), e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw e; } ... SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.class); return response; protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, boolean isRecovered, String user) throws YarnException { ...... // Create RMApp RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType()); .... } // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER: RMAppEventType.START)); //往异步处理器增加个RMAppEvent事件,类型枚值RMAppEventType.START //在RM内部会注册该类型的事件会用什么处理器来处理 } 在RM内部 // Register event handler for RmAppEvents this.rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(this.rmContext)); ... //ApplicationEventDispatcher,最终会调用到RMAPPImpl来处理这个事件 public void handle(RMAppEvent event) { this.writeLock.lock(); MAppEventType.START try { ApplicationId appID = event.getApplicationId(); LOG.debug("Processing event for " + appID + " of type " + event.getType()); final RMAppState oldState = getState(); try { /* keep the master in sync with the state machine */ this.stateMachine.doTransition(event.getType(), event); //stateMachine通过状态工厂创建,状态工厂核心addTransition //各种状态转变对应的处理器,有个submit应该是对应到MAppEventType.START } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state", e); private static final class StartAppAttemptTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { if (event.getType().equals(RMAppEventType.APP_SAVED)) { assert app.getState().equals(RMAppState.NEW_SAVING); RMAppStoredEvent storeEvent = (RMAppStoredEvent) event; if(storeEvent.getStoredException() != null) { // For HA this exception needs to be handled by giving up // master status if we got fenced LOG.error("Failed to store application: " + storeEvent.getApplicationId(), storeEvent.getStoredException()); ExitUtil.terminate(1, storeEvent.getStoredException()); } } app.createNewAttempt(true); // }; } private void createNewAttempt(boolean startAttempt) { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, user); //新建个RMAppAttemptImpl attempts.put(appAttemptId, attempt); currentAttempt = attempt; if(startAttempt) { handler.handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));//此处是RMAppAttemptEvent加入异步处理器的队列 //RM register可以看到其对应的处理器,最终调用的是RMAppAttemptImpl的handle方法 } RMAppAttemptImpl类: public void handle(RMAppAttemptEvent event) { this.writeLock.lock(); try { ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); LOG.debug("Processing event for " + appAttemptID + " of type " + event.getType()); final RMAppAttemptState oldState = getAppAttemptState(); try { /* keep the master in sync with the state machine */ this.stateMachine.doTransition(event.getType(), event); // } catch (InvalidStateTransitonException e) { .. 其中状态机有 .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition()) AttemptStartedTransition的 Transition方法 ... // Add the application to the scheduler appAttempt.eventHandler.handle( new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, appAttempt.submissionContext.getQueue(), appAttempt.user)) //该事件即是schedulerEventType,会交给schedulerDispatcher //该对象赋值SchedulerEventDispatcher,它在内部又维护了个类中央异步处理,run方法内都统一通过scheduler处理事件 //查看FIFO Scheduler的handle方法: case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent .getUser()); // } private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store FiCaSchedulerApp schedulerApp = new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); applications.put(appAttemptId, schedulerApp); metrics.submitApp(user, appAttemptId.getAttemptId()); LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + ", currently active: " + applications.size()); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.APP_ACCEPTED)); //又是个新的状态,最终RM的ApplicationMasterLauncher与NM通信 //启动AM,AM又向RM注册,那AM实始化各个map task,reduce task是怎么做的呢 } //该事件会ApplicationAttemptEventDispatcher来处理,在register里注册,会调用RMAppAttempImpl.handle来处理 public void handle(RMAppAttemptEvent event) { this.writeLock.lock(); try { ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); LOG.debug("Processing event for " + appAttemptID + " of type " + event.getType()); final RMAppAttemptState oldState = getAppAttemptState(); try { /* keep the master in sync with the state machine */ this.stateMachine.doTransition(event.getType(), event); // RMAppAttemptEventType.APP_ACCEPTED会激发从什么状态到什么状态,然后执行什么事件.addTransition定义 //会到schedulered状态,再通过CONTAINER_ALLOCATED事件到ALLOCATED_SAVING状态,再通过CONTAINER_ACQURIED到 //ALLOCATED状态,再通过LAUNCHED事件到LAUNCHED状态 比如: .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) //CONTAINER_ALLOCATED会激动SCHEDULED到ALLOCATED_SAVING状态,并执行CONTAINER_ALLOCATED //最后会在nm端启动appmaster,appmaster会初始化一系列map,reduce task,再向RM注册,向RM发送heartbeat //为task请求资源,注意心跳可能没有新的请求资源信息,再从RM内存结构里已经分配好取 //注意NM心跳到,也会执行资源分配,保留在内存结构,等appmaster来取 关键是状态机RMAPPImpl RMAppAttempImpl,内部会定义一系列的状态到状态的转换及对应的处理类
原文地址:http://blog.csdn.net/u011750989/article/details/39077743