标签:
在《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:
1、Job的调度模型与运行反馈;
2、Stage划分;
3、Stage提交:对应TaskSet的生成。
今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:
- def runJob[T, U](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- callSite: CallSite,
- resultHandler: (Int, U) => Unit,
- properties: Properties): Unit = {
-
-
- val start = System.nanoTime
-
-
-
-
-
-
- val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
-
-
- waiter.awaitResult() match {
- case JobSucceeded =>
- logInfo("Job %d finished: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- case JobFailed(exception: Exception) =>
- logInfo("Job %d failed: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
-
- val callerStackTrace = Thread.currentThread().getStackTrace.tail
- exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
- throw exception
- }
- }
runJob()方法就做了三件事:
首先,获取开始时间,方便最后计算Job执行时间;
其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;
最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。
awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:
- def awaitResult(): JobResult = synchronized {
-
-
- while (!_jobFinished) {
- this.wait()
- }
- return jobResult
- }
而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:
- override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
- if (_jobFinished) {
- throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
- }
- resultHandler(index, result.asInstanceOf[T])
- finishedTasks += 1
-
- if (finishedTasks == totalTasks) {
-
- _jobFinished = true
-
- jobResult = JobSucceeded
- this.notifyAll()
- }
- }
-
-
- override def jobFailed(exception: Exception): Unit = synchronized {
-
- _jobFinished = true
-
- jobResult = JobFailed(exception)
- this.notifyAll()
- }
接下来,看看submitJob()方法,代码定义如下:
- def submitJob[T, U](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- callSite: CallSite,
- resultHandler: (Int, U) => Unit,
- properties: Properties): JobWaiter[U] = {
-
-
-
- val maxPartitions = rdd.partitions.length
- partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
- throw new IllegalArgumentException(
- "Attempting to access a non-existent partition: " + p + ". " +
- "Total number of partitions: " + maxPartitions)
- }
-
-
- val jobId = nextJobId.getAndIncrement()
-
-
- if (partitions.size == 0) {
-
- return new JobWaiter[U](this, jobId, 0, resultHandler)
- }
-
- assert(partitions.size > 0)
-
-
- val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
-
-
- val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
-
-
- eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions.toArray, callSite, waiter,
- SerializationUtils.clone(properties)))
-
-
- waiter
- }
submitJob()方法一共做了5件事情:
第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;
第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;
第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;
第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;
第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。
这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:
- private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。
我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:
- private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:
- private val eventThread = new Thread(name) {
-
- setDaemon(true)
-
- override def run(): Unit = {
- try {
-
- while (!stopped.get) {
-
- val event = eventQueue.take()
- try {
-
- onReceive(event)
- } catch {
- case NonFatal(e) => {
- try {
- onError(e)
- } catch {
- case NonFatal(e) => logError("Unexpected error in " + name, e)
- }
- }
- }
- }
- } catch {
- case ie: InterruptedException =>
- case NonFatal(e) => logError("Unexpected error in " + name, e)
- }
- }
-
- }
那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:
- def post(event: E): Unit = {
-
- eventQueue.put(event)
- }
言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:
- override def onReceive(event: DAGSchedulerEvent): Unit = {
- val timerContext = timer.time()
- try {
-
- doOnReceive(event)
- } finally {
- timerContext.stop()
- }
- }
继续看doOnReceive()方法,代码如下:
- private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
-
-
- case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
- dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
-
-
- case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
- dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
-
- case StageCancelled(stageId) =>
- dagScheduler.handleStageCancellation(stageId)
-
- case JobCancelled(jobId) =>
- dagScheduler.handleJobCancellation(jobId)
-
- case JobGroupCancelled(groupId) =>
- dagScheduler.handleJobGroupCancelled(groupId)
-
- case AllJobsCancelled =>
- dagScheduler.doCancelAllJobs()
-
- case ExecutorAdded(execId, host) =>
- dagScheduler.handleExecutorAdded(execId, host)
-
- case ExecutorLost(execId) =>
- dagScheduler.handleExecutorLost(execId, fetchFailed = false)
-
- case BeginEvent(task, taskInfo) =>
- dagScheduler.handleBeginEvent(task, taskInfo)
-
- case GettingResultEvent(taskInfo) =>
- dagScheduler.handleGetTaskResult(taskInfo)
-
- case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
- dagScheduler.handleTaskCompletion(completion)
-
- case TaskSetFailed(taskSet, reason, exception) =>
- dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
-
- case ResubmitFailedStages =>
- dagScheduler.resubmitFailedStages()
- }
对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。
好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~
博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50667966
Spark源码分析之二:Job的调度模型与运行反馈
标签:
原文地址:http://www.cnblogs.com/jirimutu01/p/5274454.html