码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce的执行过程.

时间:2015-04-21 01:45:44      阅读:131      评论:0      收藏:0      [点我收藏+]

标签:

作业在运行时,数据或者是作业调用的一个运行图.

    技术分享

  用户写的代码通过JobClient提交给JobTracker
  Job对象中封装了JobClient
  JobConf和我们的Job对象几乎是一回事.
  把我们的代码打包成jar包,上传到hdfs中,JobClient就会获得一个jar包在hdfs中的一个路径.它会把我们jar包的一个路径告诉我们的JobTracker,告诉JobTracker之后,就为jar添加一个新的Job对象,job对象就会使用一个类,JobInProgress类进行跟踪,Job对象中里边有Mapper和Reducer任务,Mapper是map任务,Reducer是reduce任务,在运行时也需要关注这些map任务和reduce任务,所以我们JobTracker这,JobInProgress里边又创建几个TaskInProgress来跟踪我们的map任务或reduce任务,刚开始之后需要创建这个任务,创建这个任务JobTracker是不会干的,会把这个信息交给TaskTracker来执行,在TaskTracker这边启用一个TaskRunner这个类创建我们的这个Task(有可能是map任务,也有可能是reduce任务),并且去运行,这个Task到底是什么任务在我们这个程序中是不重要的.
  TaskRunner去创建一个子进程createChild,根据传过来的是map任务或者是reduce任务,去创建Map任务或者是reduce任务.首先会创建MapTask,在创建任务之前首先要找到这个代码的定义,我们的代码的定义在hdfs上,会把这个jar包给下载下来,根据传过来的TaskInProgress的定义,找到对应的Map任务,然后启动起来,之后就会调用Map方法.Map方法处理的数据,来自hdfs中输入路径,会把这个数据进行处理,转换成新的键值对输出到LocalFS(linux本地),有可能会去执行Combiner(规约),不管归不归约,数据还是会放在linux本地磁盘,Map任务就处理完了.
  处理完Map之后就会启动ReducerTask,Reduce这个任务,就会跑reduce这个程序,reduce会读取map节点上linux本地的这个数据.(Mapper任务和Reducer任务不见得会是同一台机器),reduce然后去处理,处理完之后,写入到hdfs中的目录.
  TaskRunner怎么知道Mapper任务执行完了去执行reducer任务?
    TaskRunner里边的决定权在于JobTracker,JobTracker告诉它,它才会去执行.
  TaskTracker这边会有一个心跳的机制,心跳机制会把我们的TaskTracker这些机器的状态信息都告诉给JobTracker,JobTracker根据这个状态信息做判断,告诉给TaskTracker,去执行.
  我们执行hadoop jar 这个命令时,实际上会把我们的jar包上传到hdfs当中,程序之间的运行,传递的都是jar 地址,在TaskTracker这块,要跑map任务和reducer任务时,解析jar包,找到MyMapper和MyReducer的定义,实例化  MyMapper和MyReducer这个对象,然后去执行其中的Map方法或者是Reducer方法.
  jar包仅仅是类定义的一个文件,作业运行时,是一个动态的,从jar包中找到类定义信息,通过反射机制实例化MyMapper和MyReducer类,然后去调用其中的Map方法和Reducer方法.TaskTracker应该也是把jar下载下来,要不然是找不到类定义信息.我们自己的MyMapper和MyReducer这个类,会被TaskInProgroess这个对象来管理,实际我们MyMapper和MyReducer仅仅是两个类,这两个类,是有共同的基类的.基类是Task,根据是Map或者是Reducer分为MapTask和ReducerTask,我们在这里看到的是MyMapper和MyReducer,实际上程序在运行时,是MapTask类,ReducerTask类.

  JobClient:客户端,JobTracker:服务端,JobTracker和JobClient之间是可以互相通信的,JobSubmissionProtocol相当于我们的客户端与服务端进行调用的接口.方法的调用是在JobClient中的,方法的执行时发生在服务端JobTracker.
  为什么要把jar包放在hdfs上?
    hdfs除了可以存储海量数据,还可以作为共享数据.
  RPC中我们的服务端就叫JobTracker,我们的客户端就是JobClient,之间要进行通信的话还是RPC的这个机制,只不过这个机制在传递的时候访问的这个协议是JobSubmissioonProtocol

     技术分享

    从图中可以看出JobClient在我们的job中,而我们的Job是在我们的当前应用中,我们代码中new Job() 实际上也获得一个JobClient.
    JobClient提交作业在我们客户端就是把我们的应用提交过去了,提交应用实际上是先获得一个JobId,然后再把信息写到hdfs中,然后才告结束.
    JobClient就相当于我们的Job对象,因为我们的JobClient是在Job这创建的.Job是在我们java代码中创建的.
    本质就是JobClient客户端调用服务端JobTracker对象中的方法.
    客户端提交作业之后,JobTracker先把我们的作业创建一个JobInProgress对象进行管理,然后把JobInProgress放入调度器中供调用.调度器简单的认为就是一个队列,先进先出的队列,进入这个队列之后就会从这个队列中取出这个对象,然后运行它.运行的方法实际上就是实例化一个它,在和我们的TaskTracker来进行通信,TaskTracker和JobTracker依然采用的是我们的RPC的通信机制,只是我们在JobClient和JobTracker之间通信的话用的是JObSubmissionProtocal,在我们JobTracker和TaskTracker通信的时候用的是InterTrackerProtocol
TaskTracker性质上也属于我们的客户端,如果和我们的JobTracker通信的话,意味着也会有一个协议,就叫做InterTrackerProtocol
    InterTrackerProtocol主要做:heartbeat,周期性被我们的TaskTracker调用,调用了之后去更新我们的任务状态.JobTracker就会返回一个HeartbeatResponse对象,告诉TaskTracker去处理一系列的对象,JobTracker要给TaskTracker分配任务也是使用heartbeat来做的.hearbeat会返回很多的HeartbeatResponse,方法的执行就是在TaskTracker中执行的.发送心跳之后,JobTracker就会知道TaskTracker一切信息.
    加载一个新的任务在图中是说JobTracker委托给TaskTracker去干的.通过心跳机制,我们的JobTracker送给我们的TaskTracker一个LaunchTaskAction这个命令,拿到这个命令之后就会调用addToTaskQueue这个方法放入到队列中,在addToTaskQueue队列中有一个mapLaunch和一个reducerLauncher.
mapLaunch和reducerLauncher实际上是从我们的hdfs中读我们的jar包数据,读我们的map定义和reducer定义.
JobTracker和TaskTracker之间的通信是通过TaskTracker的heartbeat这种周期性的心跳来实现的.
    TaskTracker 发送heartbear是有间隔,有规律的跳动的,
    如果上一次的发送心跳时间加上心跳间隔减去当前时间如果大于0,说明是在心跳生命周期之内,就会去休眠,如果超过这个心跳周期,就不需要休眠,执行下面代码,然后下面的代码就是去发送心跳.
    如果这个间隔小于0,就一个劲的循环不出来,如果超时,就break退出.
    JobTracker和TaskTracker是如何通信?发送心跳.
      TaskTracker会把他自身的一些信息汇报给JobTracker,汇报的方式就是通过心跳机制,它把它自身自身的信息都通过心跳方法heartbeat方法的形参告诉给JobTracker,JobTracker会汇总它的信息,告诉TaskTracker去干什么事,到了TaskTracker之间就会去处理.
    jobTracker如何和hdfs进行通信?
      JObTracker这边使用FileSystem这个类,因为我们在代码提交的时候,我们把jar包的路径,输入文件的路径,以及输出文件的路径都写到我们的这个配置中了,那么我们的JobTracker就拿到配置,在TaskTracker也可以拿到,JobTracker拿到配置信息,就是使用FileSystem去访问HDFS.我们把JobTracker不当成老大,当成hdfs的客户端.JobTracker和我们的TaskTracker和    我们的其他的代码都是我们的hdfs的客户端.
    TaskRunner是一个线程,基类,在一个单独的进程中,所以运行任务是在一个独立的进程中,目的是隔离map和reduce的系统代码.map和reduce是一个进程.他们之间也没有线程之间的问题.因为他们都是独立的进程,用来隔离这些代码的.
    launchJvmAndWait():方法,启动的是进程,加载的是JVM,map任务和reduce任务实际上是一个MapTask和ReduceTask.
程序在运行起来实际上一个叫MapTask,一个叫ReduceTask,他们有一个共同的类,叫做Task类.中含有createRunner,返回一个基类TaskRunner,TaskRunner下边有MapTaskRunner和ReduceTaskRunner.在TaskRunner中有一个方法,执行新的Map任务(runNewMapper)还是执行旧的Map任务(runOldMapper),我们的这个Mapper类要运行的话,实际上是反射实例化这个对象,然后去执行.
    我们写的Mapper方法继承自Mapper类,
    类中的map方法需要我们去实现,这个方法只被调用一次.在这里我们只需要覆盖这个方法,把自己的业务逻辑把我们的代码给写出去的.
      setup(),在我们任务开始的时候会被调用一次.
      cleanup(),在任务结束的时候会被调用一次.在我们整个map任务开始的时候会调用setup(),任务结束的时候会调用cleanup(),在map任务中间会调用map(),类似于javaWeb中的filter.

MapReduce的执行过程.

标签:

原文地址:http://www.cnblogs.com/xiaolong1032/p/4443165.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!