码迷,mamicode.com
首页 > 其他好文 > 详细

第三十七课 Spark之Task执行原理及结果

时间:2016-05-19 15:15:33      阅读:160      评论:0      收藏:0      [点我收藏+]

标签:

主要内容

1.     Task执行原理流程图

2.     Task执行源码

3.     Task执行结果在Driver端的处理


一、Task在Executor(worker)端执行及返回Driver流程图

技术分享

图37-1 Driver端与Executor交互图

二、Executor(worker)端执行源码解析

1.接收Driver端发来的消息

       当Driver中的SchedulerBackend给ExecutorBackend发送LaunchTask之后,ExecutorBackend在接收到LaunchTask消息后,首先反序列化TaskDescription。

 

StandAlone下为SchedulerBackend具体指CoarseGrainedSchedulerBackend,ExecutorBackend指CoarseGrainedExecutorBackend。

//CoarseGrainedExecutorBackend#receive
case LaunchTask(data) =>
      if (executor == null) {
	//如果不存在Executor则会报错,退出系统
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
		//反序列化Task,得到TaskDescription信息
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
	//调用executor#launchTask在executor上加载任务
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

2.Executor加载Task

Executor会通过launchTask来执行Task。

3.调用TaskRunner执行Task

//Executor#launchTask
 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
	//实例化一个TaskRunner对象来执行Task
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
	//将Task加入到正在运行的Task队列
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }


class TaskRunner(
      execBackend: ExecutorBackend,
      val taskId: Long,
      val attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer)
    extends Runnable {//省略非关键代码
override def run(): Unit = {
	//为我们的Task创建内存管理器
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
	//记录反序列化时间
      val deserializeStartTime = System.currentTimeMillis()
	//加载具体类时需要用到ClassLoader
      Thread.currentThread.setContextClassLoader(replClassLoader)
	//创建序列化器
      val ser = env.closureSerializer.newInstance()
      logInfo(s"Running $taskName (TID $taskId)")
	//调用ExecutorBackend#statusUpdate向Driver发信息汇报当前状态
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
	//记录运行时间和GC信息
      var taskStart: Long = 0
      startGCTime = computeTotalGcTime()

      try {
	//反序列化Task的依赖,得到的结果中有taskFile(运行的文件),taskJar(环境依
//赖),taskBytes(相当于缓冲池)
        val (taskFiles, taskJars, taskBytes) = 
Task.deserializeWithDependencies(serializedTask)
		//下载Task运行缺少的依赖。
        updateDependencies(taskFiles, taskJars)
		//反序列化Task
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
	//设置Task运行时的MemoryManager
        task.setTaskMemoryManager(taskMemoryManager)

		//如果Task在序列化前就已经被killed,则会抛出异常;否则,正常执行
        if (killed) {
          throw new TaskKilledException
        }
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        //运行的实际任务,并测量它的运行时间。
        taskStart = System.currentTimeMillis()
        var threwException = true
        val (value, accumUpdates) = try {
		//调用task#run方法,得到task运行的结果
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } finally {
		//清理所有分配的内存和分页,并检测是否有内存泄漏
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
          if (freedMemory > 0) {
            val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
              throw new SparkException(errMsg)
            } else {
              logError(errMsg)
            }
          }
        }
	//记录Task完成时间
        val taskFinish = System.currentTimeMillis()

        //如果Task killed,则报错。
        if (task.killed) {
          throw new TaskKilledException
        }
	//否则序列化得到的Task执行的结果
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
		//记录相关的metrics
        for (m <- task.metrics) {
          m.setExecutorDeserializeTime(
            (taskStart - deserializeStartTime) + task.executorDeserializeTime)
          m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
          m.setJvmGCTime(computeTotalGcTime() - startGCTime)
          m.setResultSerializationTime(afterSerialization - beforeSerialization)
          m.updateAccumulators()
        }
	//创建直接返回给Driver的结果对象DirectTaskResult
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit

        val serializedResult: ByteBuffer = {
		//对直接返回的结果对象大小进行判断
          if (maxResultSize > 0 && resultSize > maxResultSize) {
		//大于最大限制1G,直接丢弃ResultTask
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
		//结果大小大于设定的阀值,则放入BlockManager中
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
		//返回非直接返回给Driver的对象TaskResultTask
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            //结果不大,直接传回给Driver
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
	//通知Driver Task已完成
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

      } //省略备份代码
       finally {//将Task从运行队列中去除
        runningTasks.remove(taskId)
      }
    }

4.给Driver汇报状态

Executor会通过TaskRunner在ThreadPool中来运行具体的Task;在TaskRunner的run方法中会通过调用statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始运行了(Running状态)。

 

5.反序列化Task的依赖,并下载依赖

TaskRunner内部会有一些准备工作,例如反序列化Task的依赖,然后通过网络来获取需要的文件、Jar等信息。

补充:在执行具体的Task的业务逻辑前会进行四次反序列化:1.TaskDescription反序列化 2,Task的反序列化   3,RDD的反序列化  4,反序列化依赖

 

6.反序列化Task本身

7.获得执行结果

       调用反序列化后的Task的run方法来获得任务执行的结果;

final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem)
  : (T, AccumulatorUpdates) = {
	//创建Task执行的上下文
    context = new TaskContextImpl(
      stageId,
      partitionId,
      taskAttemptId,
      attemptNumber,
      taskMemoryManager,
      metricsSystem,
      internalAccumulators,
      runningLocally = false)
    TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
try{	//调用runTask执行Task
      (runTask(context), context.collectAccumulators())
    	}//省略部分代码
    }
  }

不同Task类型对抽象方法runTask的实现不同,ShuffleMapTask#runTask方法会调用RDD的iterator()方法来计算Task;事实上,其内部会迭代Partition的元素并利用我们自定的function来进行计算。ResultTask计算过程与之类似。不同的是ShuffleMapTask#runTask在计算具体的partition后实际上会通过shuffleManager获取shufflewriter把当前Task的计算结果根据具体的shuffleManager的实现来写入到具体的文件中,操作结束后会把MapStatus发送给DAGScheduler;而ResultTask#runTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。

对于ShuffleMapTask,首先需要对RDD以及其依赖关系进行反序列化;最终计算(不考虑cache和checkpoint)时,会调用RDD#compute方法 

def compute(split: Partition, context: TaskContext): Iterator[T]

具体计算时有具体的RDD,例如MapPartitionsRDD的compute;

override def compute(split:Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split,context))  

这里的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码;来源自对我们在该Stage的各个算子中自定义的函数的合并。

8.把Task执行结果序列化

9.判断结果传回Driver的方式

       并根据序列化后的DirectResultTask的大小选择不同的方式将结果传回给Driver端。

若果结果大于1G(可以通过spark.driver.maxResultSize来进行设置),直接丢弃

如果结果“较大”(小于1G但大于一个阀值(akkaFrameSize-AkkaUtils.reservedSizeBytes),在Spark1.6中akkaFrameSize默认为128MB,此时阀值为conf.getInt("spark.akka.frameSize",128)*1024*1024-200*1024),则会放入BlockManager中。(env.blockManager.putBytes(

blockId,serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER))

如果结果不大,则直接传回给Driver。

10.通知Driver Task已经完成

调用ExecutorBackend#statusUpdate方法给Driver发信息汇报自己的状态。告诉Driver,Task已经开始完成了(FINISHED状态)。

11.从运行队列移除当前Task

三、Drier端处理解析

1.Driver收到Executor的任务执行结果

       由上一节,可以看出Task在Executor执行完成时,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED。

//ExecutorBackend(CoarseGrainedBackend)#stateUpdate
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
	//通知Driver
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

2.Driver处理消息

//SchedulerBackend(CoarseGrainedBackend).DriverEndpoint#receive
case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)    //1
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
		//增加这个Executor的可用CPU数
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
		//重新为这个Executor分配资源
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

Driver首先会将任务的状态更新通知给TaskScheduler,然后会在这个Executor上重新分配新的计算任务。(见1)

3.通知TaskScheduler(TaskSchedulerImpl)

//TaskSchedulerImpl#statusUpdate (1处被调用)
	taskIdToTaskSetManager.get(tid) match {
          case Some(taskSet) =>
            if (TaskState.isFinished(state)) {
		//如果Task的状态是FINISHED或者FAILED或者KILLED或者LOST
		//都是为Task执行结束,清理本地的Task数据结构
              taskIdToTaskSetManager.remove(tid)
              taskIdToExecutorId.remove(tid).foreach { execId =>
                if (executorIdToTaskCount.contains(execId)) {
                  executorIdToTaskCount(execId) -= 1
                }
              }
            }
            if (state == TaskState.FINISHED) {
		//任务完成TaskSetManager标记该任务已经结束,此时Task不一定成功结束
              taskSet.removeRunningTask(tid)
		//处理任务的计算结果
              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //2
            } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
		//从TaskSetManager的运行Task队列中去除标记为完成的Task,此时Task不一定
//是成功执行结束。
              taskSet.removeRunningTask(tid)
		//处理失败的情况
             taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)//3
            }
        }

执行结束状态包括了FINISHED, FAILED, KILLED, LOST 这四种状态,所以标记为执行结束的Task并非是成功执行Task结束的。val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)

这里Task的状态只有是FINISHED的时候才是成功执行Task结束的标志,其余的状态例如:FAILED、KILLED和LOST都是Task执行失败的标志。

4.   TaskScheduler获取Task运行结果

这里TaskScheduler处理Task执行结果时,会交给一个后台守护线程池负责。

/**
 * 利用一个线程池来反序列化Task执行结果或者在必要是抓取Task结果。
 */
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
  extends Logging {
	//设置线程池内线程数,可配置通过spark.resultGetter.threads
  private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
  private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
    THREADS, "task-result-getter")
	//设置序列化器
  protected val serializer = new ThreadLocal[SerializerInstance] {
    override def initialValue(): SerializerInstance = {
      sparkEnv.closureSerializer.newInstance()
    }
  }
	//定义Task成功执行得到的结果的处理逻辑,?处被调用
  def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
	//通过线程池来获取Task执行的结果
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
		//如果是直接发送到Driver端的Task执行结果,未利用BlockManager即Executor
//发送Task的最后一种情况,考参照Executor端执行步骤9,判断传回Driver的方
//式
            case directResult: DirectTaskResult[_] =>
		//不符合抓取Task的大小限制
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              }
		//这里反序列化的值是不加锁的,这主要是为了保证多线程对其访问时,不会出现
		//其他线程因为本线程而堵塞的情况,这里我们应该先调用它,获得反序列化的
		//值,以便在TaskSetManager#handleSuccessfulTask中再次调用时,不需要再次
		//反序列化该值
              directResult.value()
		//得到Task执行的结果,由于是directResult,所以不需要远程读取。
              (directResult, serializedData.limit())
		//如果Executor返回给Driver的Task执行结果是间接的,需要借助BlockManager
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // 如果结果大小比maxResultSize,则在远程节点上(worker)删除该
//blockManager
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }
		//需要从远程节点上抓取Task执行的结果
              logDebug("Fetching indirect task result for TID %s".format(tid))
		//标记Task为需要远程抓取的Task并通知DAGScheduler
              scheduler.handleTaskGettingResult(taskSetManager, tid)
		//从远程节点的BlockManager上获取Task计算结果
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                //在Task执行结束获得结果后到driver远程去抓取结果之间,如果运行task的
//机器挂掉,或者该机器的BlockManager已经刷新掉了Task执行结果,都会导致
//远程抓取结果失败,即结果丢失。
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              //远程抓取结果成功,反序列化结果
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              //删除远程的结果
              sparkEnv.blockManager.master.removeBlock(blockId)
		//得到IndirectResult类型的结果
		(deserializedResult, size)
          }
          result.metrics.setResultSize(size)
		//标记Task为SuccessfulTask并通知DAGScheduler
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)   //4
        } catch {//省略部分非关键代码
        }
      }
    })
  }
}

这里TaskScheduler获得方式结果主要是依据Driver端得到Executor端返回的Task运行结果确定的,有两种方式1)DirectResult,2)InDirectResult。对于1)直接可以反序列化Driver端接到的返回信息得到Task运行结果;对于2)则需要借助远程节点(worker)上的BlockManager来远程获取结果。

5.TaskScheduler处理运行结果

//4处被调用
def handleSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      taskResult: DirectTaskResult[_]): Unit = synchronized {
    taskSetManager.handleSuccessfulTask(tid, taskResult)
  }

	//TaskSetManager#hadleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
    val info = taskInfos(tid)
    val index = info.index
    info.markSuccessful()
    removeRunningTask(tid)
    //向高层调度器报告结果   5
    sched.dagScheduler.taskEnded(
      tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
   	//判断该TaskSet中Task是否已全部执行成功
	if (!successful(index)) {//该Task还未标记为成功执行
	//增加执行成功的Task 
      tasksSuccessful += 1
      logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
        info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
      // 标记执行成功的Task,如果TaskSet中的所有Task执行成功则停止该TaskSetManager
      successful(index) = true
      if (tasksSuccessful == numTasks) {
        isZombie = true
      }
    } else {
      logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
        " because task " + index + " has already completed successfully")
    }
    //从执行失败的集合中删去该Task,用于Task失败重试
    failedExecutors.remove(index)
	//判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该
//TaskSetManager的调度结束,从调度池中删除该TaskSetManager
    maybeFinishTaskSet()
  }

6.TaskScheduler向高层调度器DAGScheduler报告

//DAGScheduler#taskEnd      5处被调用
def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Map[Long, Any],
      taskInfo: TaskInfo,
      taskMetrics: TaskMetrics): Unit = {
    //加入DAGScheduler的消息队列,等待处理
    eventProcessLoop.post(
      CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
  }


//DAGScheduler#doOnReceive
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

(1)处理ShuffleMapTask

       对于ShuffleMapTask,首先需要将结果保存到Stage,如果当前Stage所有Task都结束了,则将所有的结果注册到MapOutputTrackerMaster;这样下一个Stage的Task就可以通过他来获取Shuffle的结果原数据信息,进而从Shuffle数据所在的节点获取数据了。

//DAGScheduler#handleTaskCompletion
event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,event.reason, event.taskInfo, event.taskMetrics))
		//从该stage中等待处理的partition中去除Task对应的partition
        stage.pendingPartitions -= task.partitionId
        task match {
		//如果是ShuffleMapTask
          case smt: ShuffleMapTask =>
		//实例化一个shuffleStage实例,用来保存TaskSet结果
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
		//跟新本地的状态信息
            updateAccumulators(event)
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
		//忽略在集群中游走的ShuffleMapTask(来自一个失效的节点的Task结果)。
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
		//将结果保存到Stage中。即将Task结果的输出位置放到Stage的数据结构中。
              shuffleStage.addOutputLoc(smt.partitionId, status)
            }
		//如果当前Stage运行完毕
            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
		//标记当前Stage为Finished,并将其从运行中Stage列表中删除
              markStageAsFinished(shuffleStage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              //将整体结果注册到MapOutputTrackerMaster;
              mapOutputTracker.registerMapOutputs(
                shuffleStage.shuffleDep.shuffleId,
                shuffleStage.outputLocInMapOutputTrackerFormat(),
                changeEpoch = true)
		//清除本地缓存
              clearCacheLocs()
		//如果shuffleMapStage中有一些tasks运行失败,没有结果。
              if (!shuffleStage.isAvailable) {
                //则需要重新提交这个shuffleMapStage,并且需要告知顶层调度器TaskScheduler
		//进行处理。
                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                  ") because some of its tasks had failed: " +
                  shuffleStage.findMissingPartitions().mkString(", "))
                submitStage(shuffleStage)
              } else {
                // 标记所有等待这个Stage结束的Map-Stage Job为结束状态
		//这里会将这个Job记为Finished状态,并统计输出结果,报告给监听器
                if (shuffleStage.mapStageJobs.nonEmpty) {
                  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
                  for (job <- shuffleStage.mapStageJobs) {
                    markMapStageJobAsFinished(job, stats)
                  }
                }
              }

从ActiveJob类的注释可以看出,Job可以有两种类型:result job,这会触发ResultStage执行的action操作,或Map-stage Job,在任何下游Stage提交之前计算出其所需的前一个Stage的结果,并对ShuffleMapStage的结果进行映射。后者用于自适应查询计划,用于在提交后期stage之前可以查看上有Stage输出结果的统计信息(如一些结果位置的元数据信息)。我们使用这个类的finalStage字段来对两种类型的Job进行区分。对于Map-Stage会借助MapOutputTracker来映射上游的Stage的Task输出信息,来实现前一个Stage输出信息的位置等元信息传递给后一个Stage的过程;并直接标记Map-Stage Job结束,并报告输出统计信息给监听器。

ActiveJob类中会记录Job所需计算的分片(Partition)数目,以及每个Partition是否计算完成。由于Task与Partition是一一对应的,所以我们从这个类中可以知道有多少个Task,与Task执行完成的个数。


(2)处理ResultTask

       首先,MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,然后,ResultTask会根据前面Stage的执行结果进行Shuffle产生整个Job最后的结果。

case rt: ResultTask[_, _] =>
            实例化一个ResultStage来存储resultTask
            val resultStage = stage.asInstanceOf[ResultStage]
            resultStage.activeJob match {
              case Some(job) =>
		//如果这个Task未执行完成
                if (!job.finished(rt.outputId)) {
		//更新当前状态
                  updateAccumulators(event)
			//将Task所对应的Partition标记为计算完成
                  job.finished(rt.outputId) = true
			//当前作业中Partition完成数增加
                  job.numFinished += 1
                  // 如果当前的Job所有Partition对已计算完成,就将这个Stage remove掉
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(resultStage)
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(
                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                  }
		//在处理SucceedTask时,会调用一些用户定义的函数,可能会产生异常,为
		//了确保程序的健壮性,需要进行异常处理。
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)//⑥
                  } catch {
                    case e: Exception =>
                      //当异常发生时,有时需要标记ResultStage失败。
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
                }
              case None =>
		//在任务进行推测执行时,可能有多个Task的执行结果,对于对于的结果,系统
		//会进行忽略处理。
                logInfo("Ignoring result from " + rt + " because its job has finished")

//⑥处被调用 JobListener.scala
private[spark] trait JobListener {
	//对Task执行结果进行处理的核心逻辑
  def taskSucceeded(index: Int, result: Any)    //⑦
	//对Task执行失败进行处理的而核心逻辑。
  def jobFailed(exception: Exception)
}


//对父类trait JobListener中的抽象方法的具体实现
//JobWaiter#taskSucceed  对⑦抽象方法的实现
override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
	//如果当前Job处理已完成,说明Task进行了重复处理,则会报错。
    if (_jobFinished) {
      throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
    }
	//调用用户逻辑,即用户定义的处理方法,来处理Task的结果
    resultHandler(index, result.asInstanceOf[T])
	//记录当前Job的Task完成数增加
    finishedTasks += 1
	//如果当前Job的所有Task都已执行完毕,则表明整个Job完成。
    if (finishedTasks == totalTasks) {
      _jobFinished = true
      jobResult = JobSucceeded
	//通知所有调用JobWaiter#awaitResult的方法,Job执行完成,可以继续运行了。
      this.notifyAll()
    }
  }

在DAGScheduler#JobSubmit中,会得到JobWaiter类的实例waiter,从而获得Job的执行结果。最终在DAGScheduler#runJob中,调用waiter#awaitResult,对JobSuceeded进行报告,并写入日志。Job执行就结束了。

7.补充:Task出错处理

       对于出错或是执行失败的Task,TaskSchedulerImpl#statsUpdate会调用TaskResultGetter#enqueueFailedTask来处理。这个处理过程与执行成功的Task的处理过程是类似的,它们(执行成功和执行失败的Task)会是公用一个线程池来执行处理逻辑。


// TaskResultGetter# enqueueFailedTask定义Task执行失败的处理逻辑,3处被调用 
//这部分可以理解为Scheduler的容错功能。
  def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
    serializedData: ByteBuffer) {
	//记录执行失败的原因
    var reason : TaskEndReason = UnknownReason
    try {
	//调用线程池的一个线程来处理。
      getTaskResultExecutor.execute(new Runnable { //具体处理逻辑
        override def run(): Unit = Utils.logUncaughtExceptions {
          val loader = Utils.getContextOrSparkClassLoader
          try {
		//如果是序列化结果为空或是序列化结果大于规定值,则是序列化失败导致Task执行
		//失败。
            if (serializedData != null && serializedData.limit() > 0) {
              reason = serializer.get().deserialize[TaskEndReason](
                serializedData, loader)
            }
          } catch {//序列化过程中抛出异常。
            case cnd: ClassNotFoundException =>
             //由于Task执行失败并非致命性错误,所以这里只需将信息记录到日志里之后,仍然
		//可以继续执行程序
              logError(
                "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
            case ex: Exception => {}
          }
		//调用TaskSchulerImpl#handleFailedTask来处理Task失败,该方法中定义了处理
		//Task失败的核心逻辑。
          scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) //⑧
        }
      })
    } catch {//处理SparkContext已关闭异常
      case e: RejectedExecutionException if sparkEnv.isStopped =>
        // ignore it
    }
  }


//TaskScheduler#hadleFailTask   ⑧处调用
def handleFailedTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      taskState: TaskState,
      reason: TaskEndReason): Unit = synchronized {
   	//调用TaskSetManager处理失败的情况。
 	taskSetManager.handleFailedTask(tid, taskState, reason)    //⑨
    if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
      //这里需要重新进行资源调度来执行失败的Task,而失败的Task的状态(例如执行失败次数
	//等)已由TaskManager进行了更新,来反应该任务是失败后重新执行的Task
      backend.reviveOffers()
    }
  }

TaskSetManager#handlerFailTask方法主要是将任务标记为失败,并将它重新添加到待处理任务列表,同时通知高层调度器DAG Scheduler。

 

//TaskSetManager#handlerFailTask    ⑨处调用
def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
	//省略部分非关键代码,这些代码主要处理一些出错信息,并根据不同的出错信息做一些日志记
	//录操作。
    //如果Executor failed,则尝试重新加入这些Executor。
    failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
      put(info.executorId, clock.getTimeMillis())
	//调用高层调度器(DAGScheduler)进行容错     //⑩
    sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
    //将Task加入到待处理任务列表
	addPendingTask(index)
    if (!isZombie && state != TaskState.KILLED
        && reason.isInstanceOf[TaskFailedReason]
		//这里的countTowardTaskFailures指,是否允许在Stage被丢弃前,执行最大次数的
		//Task失败重试。只有当Task的执行失败与Task本身无关时,才会设置为false(例
		//如,执行Task的Executor挂掉了)。
        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
      assert (null != failureReason)
      numFailures(index) += 1
	//如果失败次数大于最大失败次数,则将Task丢弃。
      if (numFailures(index) >= maxTaskFailures) {
        logError("Task %d in stage %s failed %d times; aborting job".format(
          index, taskSet.id, maxTaskFailures))
        abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
          .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
        return
      }
    }
	//判断TaskSet中Task是否已全部执行完成,是则说明该TaskSet已执行完成,相应的对该
//TaskSetManager的调度结束,从调度池中删除该TaskSetManager
    maybeFinishTaskSet()
  }

 给高层发送消息,调用高层容错机制。

 //DAGScheduler#taskEnd   
	def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Map[Long, Any],
      taskInfo: TaskInfo,
      taskMetrics: TaskMetrics): Unit = {
    eventProcessLoop.post(
      CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
  }


与执行成功的Task一样,向高层调度器DAGScheduler发送的是由CompletionEvent封装的消息。而DAGScheduler会接收到这个消息,对其进行容错处理。

//DAGScheduler#doOnReceive
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

这里是否有似曾相识的感觉,其实步骤6中也有这一过程,再DAGScheduler#handleTaskCompletion中,会根据不同的event#reason,也就是出错信息,进行处理。主要处理的有重复提交Resubmitted和远程读取失败FetchFailed,而其他出错情况则大都采用鸵鸟政策,什么也不做。这边是高层DAGScheduler的容错处理。

通过对Driver端执行的过程的观察,我们可以看出底层调度器和高层调度器是紧密合作的,很多时候,在接收到Worker端的StateUpdate信息后,先由TaskSchedulerImpl进行处理,然后同时底层调度器,将这些信息报告给高层调度器,就通信过程来看,真正与Worker联系的是底层调度器,这是在Task层次上的;而底层调度其会将这些信息进行加工,向高层调度器报告,这是联系的内容大都是TaskSetManager,所以这是就是在TaskSetManager层次上进行处理的。所以我们可以看到底层和高层进行处理时,所处的层次是不一样的,这也就是为什么会划分两个调度器的原因了。

对于容错,底层调度器和高层调度器也是合作进行的,所以Task在出错时,会进行两个层次上的容错处理,这就大大提交了容错的效率和可靠性。



参考书目:张安站 --Spark技术内幕


说明:

本文是由DT大数据梦工厂的IFM课程第37课为基础上,加入一些参考资料所做的笔记

技术分享


















第三十七课 Spark之Task执行原理及结果

标签:

原文地址:http://blog.csdn.net/sinat_25306771/article/details/51451908

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!