1含有dagScheduler的runJob函数的runJob是入口,并且是堵塞的操作,即直到Spark完成Job的运行之前,rdd.doCheckpoint()是不会执行的。堵塞在3的waiter.awaitResult()操作,即submitJob会返回一个waiter对象,而awaitResult()就堵塞了。其中resultHandler参数是下面runjob构造的回调函数,这里是没有返回值的。
2runjob,构造Array,并将函数对象"(index, res) => results(index) = res"继续传递给runJob函数(即在runJob添加一个回调函数,将runJob的运行结果保存到Array),等待runJob函数运行结束,将results返回。
3最终导致了DAGScheduler中的submitJob中,给这次job分配了一个jobID, 通过创建了一个JobWaiter对象,返回给1中。调用eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))向DAG调度器 发送一个case class JobSubmitted的消息。
4消息包包含:jobId,rdd,func2,partitions.toArray,callSite/properties:个人不是很感兴趣,姑且理解为不重要的
waiter就是上面创建的JobWaiter对象,这个很重要,因为这个对象封装了几个重要的参数:jobId:Job编号,partitions.size:分区编号(它需要接受到partitions.size个归属于jobid的task成功运行的结果,并通过resultHandler来将这些task运行结果回调给的Array),resultHandler:回调函数
5消息循环器 会不断的检查有没有消息要处理,并调用handlerjobsubmitted来处理该消息,开始划分stages。HandleJobSubmitted 生成finalStage后,就会为该Job生成一个ActiveJob,保存当前Job的一些信息,同时调用submitStage来提交Stage。
(这里Stage的划分是对一个Job里面一系列RDD转换和动作进行划分。首先job是因动作而产生,因此每个job肯定都有一个ResultStage,否则job就不会启动。其次,如果Job内部RDD之间存在宽依赖,Spark会针对它产生一个中间Stage,即为ShuffleStage,严格来说应该是ShuffleMapStage,这个stage是针对父RDD而产生的, 相当于在父RDD上做一个父rdd.map().collect()的操作。ShuffleMapStage生成的map输入,对于子RDD,如果检测到所自己所“宽依赖”的stage完成计算,就可以启动一个shuffleFectch, 从而将父RDD输出的数据拉取过程,进行后续的计算。因此一个Job由一个ResultStage和多个ShuffleMapStage组成。
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite))
(这里的newResultStage是如何划分task(一个partition就是一个task)?)
6首先是在当前的rdd上调用getParentStagesAndId来生成父Stage,父Stages是一个列表
然后就创建一个Stage对象,并更新Stage和job之间的关系.
stage类:
ShuffleMapStage:
shuffleDep:该stage生成的原因
parents:父stage列表
ShuffleMapStage.class