标签:base class tor scheduler skin foreach each oca hand
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] runningTasks.put(taskDescription.taskId, taskRunner)
val taskSet = taskIdToTaskSetManager.get(tid) taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { executorId => executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } } taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) }
val info = taskInfos(tid)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo) extends DAGSchedulerEvent
private def submitWaitingChildStages(parent: Stage) { ... val childStages = waitingStages.filter(_.parents.contains(parent)).toArray waitingStages --= childStages for (stage <- childStages.sortBy(_.firstJobId)) { submitStage(stage) } }
for (stage <- stageIdToStage.get(stageId)) { if (runningStages.contains(stage)) { logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) { shuffleIdToMapStage.remove(k) } if (waitingStages.contains(stage)) { logDebug("Removing stage %d from waiting set.".format(stageId)) waitingStages -= stage } if (failedStages.contains(stage)) { logDebug("Removing stage %d from failed set.".format(stageId)) failedStages -= stage } } // data structures based on StageId stageIdToStage -= stageId jobIdToStageIds -= job.jobId jobIdToActiveJob -= job.jobId activeJobs -= job
标签:base class tor scheduler skin foreach each oca hand
原文地址:http://www.cnblogs.com/hframe/p/6970818.html