标签:
上篇文章《
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>val job = s.activeJob.getpartitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}}
val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.internalAccumulators)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, stage.internalAccumulators)}}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingPartitions ++= tasks.map(_.partitionId)logDebug("New pending partitions: " + stage.pendingPartitions)taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = manager
private[spark] class TaskSetManager(sched: TaskSchedulerImpl,//绑定的TaskSchedulerImplval taskSet: TaskSet,val maxTaskFailures: Int, //失败最大重试次数clock: Clock = new SystemClock())extends Schedulable with Logging
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将TaskSetManager加入rootPool调度池中,由schedulableBuilder决定调度顺序
// default scheduler is FIFOprivate val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
def initialize(backend: SchedulerBackend) {this.backend = backend// temporarily set rootPool name to emptyrootPool = new Pool("", schedulingMode, 0, 0)schedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)}}schedulableBuilder.buildPools()}
override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
// Make fake resource offers on all executorsprivate def makeOffers() {//过滤出活跃状态的Executorval activeExecutors = executorDataMap.filterKeys(executorIsAlive)//将Executor封装成WorkerOffer对象val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
// 随机打乱offersval shuffledOffers = Random.shuffle(offers)// 构建一个二维数组,保存每个Executor上将要分配的那些taskval tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArray
//根据SchedulerBuilder的调度算法,给TaskManager排好序
val sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// 使用双重循环,对每一个taskset 依照调度的顺序,依次按照本地性级别顺序尝试启动task// 数据本地性级别顺序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYvar launchedTask = falsefor (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {do {launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)} while (launchedTask)}if (tasks.size > 0) {hasLaunchedTask = true}return tasks
用当前的数据本地性,调用TaskSetManager的resourceOffer方法,在当前executor上分配task
private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = falsefor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).host//如果executor 的cup数大于 每个task的cup数目(值为1)if (availableCpus(i) >= CPUS_PER_TASK) {try {//for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += taskval tid = task.taskIdtaskIdToTaskSetManager(tid) = taskSettaskIdToExecutorId(tid) = execIdexecutorIdToTaskCount(execId) += 1executorsByHost(host) += execIdavailableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}}
Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法
标签:
原文地址:http://www.cnblogs.com/zhouyf/p/5743382.html