码迷,mamicode.com
首页 > 其他好文 > 详细

MR-Job提交流程

时间:2015-08-20 23:59:34      阅读:459      评论:0      收藏:0      [点我收藏+]

标签:

1.一个标准 MR-Job 的执行入口:

//参数 true 表示检查并打印 Job 和 Task 的运行状况
System.exit(job.waitForCompletion(true) ? 0 : 1);

2.job.waitForCompletion(true)方法的内部实现

//job.waitForCompletion()方法的内部实现
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit(); //此方法的核心在于submit()
    }
    if (verbose) { //根据传入的参数,决定是否打印Job运行的详细过程
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
}

3. Job 类 submit()方法的内部实现

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);    
    setUseNewAPI();//使用MapReduce新的API
    connect();//返回一个【客户端代理对象 Cluster】(属于 Job 类),用于和服务端RM建立RPC通信
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {

        //提交Job
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;//设置 JobStatus 为 Running
    LOG.info("The url to track the job: " + getTrackingURL());
}

3.1.1.查看Connect()方法的内部实现

private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     //返回一个Cluster对象,并将此对象作为 Job 类的一个成员变量
//即 Job 类持有 Cluster 的引用。
return new Cluster(getConfiguration()); } }); } }

3.1.2.查看new Cluster()的实现过程

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);//重点在于此方法的内部实现
}

3.1.3.客户端代理对象Cluster实例化过程有两种实现:LocalClientProtocolProvider(本地模式)和YarnClientProtocolProvider(Yarn模式)。

synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());

//ClientProtocol 是Client端和NN通信的RPC协议,根据RPC通信原理,此协议接口中必定包含一个 versionID 字段。
ClientProtocol clientProtocol
= null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { //初始化Cluster内部成员变量 clientProtocolProvider = provider; client = clientProtocol; //创建Cluster类的客户端代理对象client LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } }

3.1.4.ClientProtocol接口中包含的versionID 字段

//Version 37: More efficient serialization format for framework counters
public static final long versionID = 37L;

3.2.1.查看 JobSubmitter 类中 submitJobInternal()方法的实现:

 

MR-Job提交流程

标签:

原文地址:http://www.cnblogs.com/skyl/p/4746446.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!