标签:
话说在《Spark源码分析之五:Task调度(一)》一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法。这个方法针对接收到的ReviveOffers事件进行处理。代码如下:
-
- private def makeOffers() {
-
-
- val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
-
-
- val workOffers = activeExecutors.map { case (id, executorData) =>
-
- new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toSeq
-
-
-
- launchTasks(scheduler.resourceOffers(workOffers))
- }
代码逻辑很简单,一共分为三步:
第一,从executorDataMap中过滤掉under killing的executors,得到activeExecutors;
第二,利用activeExecutors中executorData的executorHost、freeCores,获取workOffers,即资源;
第三,调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks:这个scheduler就是TaskSchedulerImpl。
我们逐个进行分析,首先看看这个executorDataMap,其定义如下:
- private val executorDataMap = new HashMap[String, ExecutorData]
它是CoarseGrainedSchedulerBackend掌握的集群中executor的数据集合,key为String类型的executorId,value为ExecutorData类型的executor详细信息。ExecutorData包含的主要内容如下:
1、executorEndpoint:RpcEndpointRef类型,RPC终端的引用,用于数据通信;
2、executorAddress:RpcAddress类型,RPC地址,用于数据通信;
3、executorHost:String类型,executor的主机;
4、freeCores:Int类型,可用处理器cores;
5、totalCores:Int类型,处理器cores总数;
6、logUrlMap:Map[String, String]类型,日志url映射集合。
这样,通过executorDataMap这个集合我们就能知道集群当前executor的负载情况,方便资源分析并调度任务。那么executorDataMap内的数据是何时及如何更新的呢?go on,继续分析。
对于第一步中,过滤掉under killing的executors,其实现是对executorDataMap中的所有executor调用executorIsAlive()方法中,判断是否在executorsPendingToRemove和executorsPendingLossReason两个数据结构中,这两个数据结构中的executors,都是即将移除或者已丢失的executor。
第二步,在过滤掉已失效或者马上要失效的executor后,利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源,这个workOffers更简单,是一个WorkerOffer对象,它代表了系统的可利用资源。WorkerOffer代码如下:
- private[spark]
- case class WorkerOffer(executorId: String, host: String, cores: Int)
而最重要的第三步,先是调用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然后再调用launchTasks()方法将tasks加载到executor上去执行。
我们先看下TaskSchedulerImpl的resourceOffers()方法。代码如下:
- def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
-
-
-
-
-
- var newExecAvail = false
-
-
- for (o <- offers) {
-
-
- executorIdToHost(o.executorId) = o.host
-
-
-
- executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
-
-
-
- if (!executorsByHost.contains(o.host)) {
-
-
- executorsByHost(o.host) = new HashSet[String]()
-
-
-
-
- executorAdded(o.executorId, o.host)
-
-
- newExecAvail = true
- }
-
-
- for (rack <- getRackForHost(o.host)) {
- hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
- }
- }
-
-
-
- val shuffledOffers = Random.shuffle(offers)
-
-
-
- val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
-
-
- val availableCpus = shuffledOffers.map(o => o.cores).toArray
-
-
-
-
- val sortedTaskSets = rootPool.getSortedTaskSetQueue
-
-
- for (taskSet <- sortedTaskSets) {
-
- logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- taskSet.parent.name, taskSet.name, taskSet.runningTasks))
-
-
- if (newExecAvail) {
-
- taskSet.executorAdded()
- }
- }
-
-
-
-
- var launchedTask = false
-
-
-
- for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
- do {
-
- launchedTask = resourceOfferSingleTaskSet(
- taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
- } while (launchedTask)
- }
-
-
- if (tasks.size > 0) {
- hasLaunchedTask = true
- }
-
- return tasks
- }
首先来看下它的主体流程。如下:
1、设置标志位newExecAvail为false,这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;
2、循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源:
2.1、更新executorIdToHost,executorIdToHost为利用HashMap存储executorId->host映射的集合;
2.2、更新executorIdToTaskCount,executorIdToTaskCount为每个executor上运行的task的数目集合,这里如果之前没有的话,初始化为0;
2.3、如果新的slave加入:
2.3.1、executorsByHost中添加一条记录,key为host,value为new HashSet[String]();
2.3.2、发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理;
2.3.3、新的slave加入时,标志位newExecAvail设置为true;
2.4、更新hostsByRack;
3、随机shuffle offers(集群中可用executor资源)以避免总是把任务放在同一组workers上执行;
4、构造一个task列表,以分配到每个worker,针对每个executor按照其上的cores数目构造一个cores数目大小的ArrayBuffer,实现最大程度并行化;
5、获取可以使用的cpu资源availableCpus;
6、调用Pool.getSortedTaskSetQueue()方法获得排序好的task集合,即sortedTaskSets;
7、循环sortedTaskSets中每个taskSet:
7.1、如果存在新加入的slave,则调用taskSet的executorAdded()方法,动态调整位置策略级别,这么做很容易理解,新的slave节点加入了,那么随之而来的是数据有可能存在于它上面,那么这时我们就需要重新调整任务本地性规则;
8、循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性:
8.1、对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度;
9、设置标志位hasLaunchedTask,并返回tasks。
接下来,我们详细解释下其中的每个步骤。
第1步不用讲,只是设置标志位newExecAvail为false,并且记住这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;
第2步是集群中的可用executor资源offers的循环处理,更新一些数据结构,并且,在新的slave加入时,标志位newExecAvail设置为true,并且发送一个ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法处理。我们来看下DAGScheduler的这个方法:
- private[scheduler] def handleExecutorAdded(execId: String, host: String) {
-
- if (failedEpoch.contains(execId)) {
- logInfo("Host added was in lost list earlier: " + host)
- failedEpoch -= execId
- }
- submitWaitingStages()
- }
很简单,先将对应host从failedEpoch中移除,failedEpoch存储的是系统探测到的失效节点的集合,存储的是execId->host的对应关系。接下来便是调用submitWaitingStages()方法提交等待的stages。这个方法我们之前分析过,这里不再赘述。但是存在一个疑点,之前stage都已提交了,这里为什么还要提交一遍呢?留待以后再寻找答案吧。
第3步随机shuffle offers以避免总是把任务放在同一组workers上执行,这也没什么特别好讲的,为了避免所谓的热点问题而采取的一种随机策略而已。
第4步也是,构造一个task列表,以分配到每个worker,针对每个executor,创建一个ArrayBuffer,存储的类型为TaskDescription,大小为executor的cores,即最大程度并行化,充分利用executor的cores。
第5步就是获取到上述executor集合中cores集合availableCpus,即可以使用的cpu资源;
下面我们重点分析下第6步,它是调用Pool.getSortedTaskSetQueue()方法,获得排序好的task集合。还记得这个Pool吗?它就是上篇文章《Spark源码分析之五:Task调度(一)》里讲到的调度器的中的调度池啊,我们看下它的getSortedTaskSetQueue()方法。代码如下:
- override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
-
-
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-
-
-
-
-
-
- val sortedSchedulableQueue =
- schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
-
-
- for (schedulable <- sortedSchedulableQueue) {
- sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
- }
-
-
- sortedTaskSetQueue
- }
首先,创建一个ArrayBuffer,用来存储TaskSetManager,然后,对Pool中已经存储好的TaskSetManager,即schedulableQueue队列,按照taskSetSchedulingAlgorithm调度规则或算法来排序,得到sortedSchedulableQueue,并循环其内的TaskSetManager,通过其getSortedTaskSetQueue()方法来填充sortedTaskSetQueue,最后返回。TaskSetManager的getSortedTaskSetQueue()方法也很简单,追加ArrayBuffer[TaskSetManager]即可,如下:
- override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
- sortedTaskSetQueue += this
- sortedTaskSetQueue
- }
我们着重来讲解下这个调度准则或算法taskSetSchedulingAlgorithm,其定义如下:
- var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
- schedulingMode match {
- case SchedulingMode.FAIR =>
- new FairSchedulingAlgorithm()
- case SchedulingMode.FIFO =>
- new FIFOSchedulingAlgorithm()
- }
- }
它包括两种,FAIR和FIFO,下面我们以FIFO为例来讲解。代码在SchedulingAlgorithm.scala中,如下:
- private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
-
- override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
- val priority1 = s1.priority
- val priority2 = s2.priority
-
-
-
-
- var res = math.signum(priority1 - priority2)
- if (res == 0) {
- val stageId1 = s1.stageId
- val stageId2 = s2.stageId
- res = math.signum(stageId1 - stageId2)
- }
- if (res < 0) {
- true
- } else {
- false
- }
- }
- }
很简单,就是先比较两个TaskSetManagerder的优先级priority,优先级相同再比较stageId。而这个priority在TaskSet生成时,就是jobId,也就是FIFO是先按照Job的顺序再按照Stage的顺序进行顺序调度,一个Job完了再调度另一个Job,Job内是按照Stage的顺序进行调度。关于priority生成的代码如下所示:
-
- taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
比较复杂的是FairSchedulingAlgorithm,代码如下:
- private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
- override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
-
- val minShare1 = s1.minShare
- val minShare2 = s2.minShare
- val runningTasks1 = s1.runningTasks
- val runningTasks2 = s2.runningTasks
- val s1Needy = runningTasks1 < minShare1
- val s2Needy = runningTasks2 < minShare2
- val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
- val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
- val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
- val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
- var compare: Int = 0
-
-
-
-
- if (s1Needy && !s2Needy) {
- return true
- } else if (!s1Needy && s2Needy) {
- return false
- } else if (s1Needy && s2Needy) {
-
-
- compare = minShareRatio1.compareTo(minShareRatio2)
- } else {
-
- compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
- }
-
- if (compare < 0) {
- true
- } else if (compare > 0) {
- false
- } else {
- s1.name < s2.name
- }
- }
- }
它的调度逻辑主要如下:
1、优先看正在运行的tasks数目是否小于最小共享cores数,如果两者只有一个小于,则优先调度小于的那个,原因是既然正在运行的Tasks数目小于共享cores数,说明该节点资源比较充足,应该优先利用;
2、如果不是只有一个的正在运行的tasks数目是否小于最小共享cores数的话,则再判断正在运行的tasks数目与最小共享cores数的比率;
3、最后再比较权重使用率,即正在运行的tasks数目与该TaskSetManager的权重weight的比,weight代表调度池对资源获取的权重,越大需要越多的资源。
到此为止,获得了排序好的task集合,我们来到了第7步:如果存在新加入的slave,则调用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法,代码如下:
- def executorAdded() {
- recomputeLocality()
- }
没说的,继续追踪,看recomputeLocality()方法。代码如下:
- def recomputeLocality() {
-
-
- val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
-
-
- myLocalityLevels = computeValidLocalityLevels()
-
-
- localityWaits = myLocalityLevels.map(getLocalityWait)
-
-
- currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
- }
首先说下这个currentLocalityIndex,它的定义为:
- var currentLocalityIndex = 0
它是有效位置策略级别中的索引,指示当前的位置信息。也就是我们上一个task被launched所使用的Locality Level。
接下来看下myLocalityLevels,它是任务集TaskSet中应该使用哪种位置Level的数组,在TaskSetManager对象实例化时即被初始化,变量定义如下:
-
- var myLocalityLevels = computeValidLocalityLevels()
computeValidLocalityLevels()方法为计算该TaskSet使用的位置策略的方法,代码如下:
- private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
-
- import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
-
-
- val levels = new ArrayBuffer[TaskLocality.TaskLocality]
-
-
-
-
- if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
- pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
- levels += PROCESS_LOCAL
- }
-
-
-
-
- if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
- pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
- levels += NODE_LOCAL
- }
-
-
- if (!pendingTasksWithNoPrefs.isEmpty) {
- levels += NO_PREF
- }
-
-
- if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
- pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
- levels += RACK_LOCAL
- }
-
-
- levels += ANY
- logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
-
-
- levels.toArray
- }
这里,我们先看下其中几个比较重要的数据结构。在TaskSetManager中,存在如下几个数据结构:
- private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
-
- private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
-
- private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
-
-
- var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
-
-
- val allPendingTasks = new ArrayBuffer[Int]
这些数据结构,存储了task与不同位置的载体的对应关系。在TaskSetManager对象被构造时,有如下代码被执行:
-
-
- for (i <- (0 until numTasks).reverse) {
- addPendingTask(i)
- }
它对TaskSetManager中的tasks的索引倒序处理。addPendingTask()方法如下:
-
- private def addPendingTask(index: Int) {
-
-
- def addTo(list: ArrayBuffer[Int]) {
- if (!list.contains(index)) {
- list += index
- }
- }
-
-
- for (loc <- tasks(index).preferredLocations) {
- loc match {
- case e: ExecutorCacheTaskLocation =>
-
- addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
- case e: HDFSCacheTaskLocation => {
-
-
- val exe = sched.getExecutorsAliveOnHost(loc.host)
- exe match {
- case Some(set) => {
-
- for (e <- set) {
- addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
- }
- logInfo(s"Pending task $index has a cached location at ${e.host} " +
- ", where there are executors " + set.mkString(","))
- }
- case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
- ", but there are no executors alive there.")
- }
- }
- case _ => Unit
- }
-
-
- addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
-
-
- for (rack <- sched.getRackForHost(loc.host)) {
- addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
- }
- }
-
-
- if (tasks(index).preferredLocations == Nil) {
- addTo(pendingTasksWithNoPrefs)
- }
-
-
- allPendingTasks += index
- }
鉴于上面注释很清晰,这里,我们只说下重点,它是根据task的preferredLocations,来决定该往哪个数据结构存储的。最终,将task的位置信息,存储到不同的数据结构中,方便后续任务调度的处理。
同时,在TaskSetManager中TaskSchedulerImpl类型的变量中,还存在着如下几个数据结构:
-
- private val executorIdToTaskCount = new HashMap[String, Int]
-
-
-
-
-
- protected val executorsByHost = new HashMap[String, HashSet[String]]
-
-
- protected val hostsByRack = new HashMap[String, HashSet[String]]
它反映了当前集群中executor、host、rack的对应关系。而在computeValidLocalityLevels()方法中,根据task的位置属性和当前集群中executor、host、rack的对应关系,依靠上面这两组数据结构,就能很方便的确定该TaskSet的TaskLocality Level,详细流程不再赘述,读者可自行阅读代码。
这里,我们只说下getLocalityWait()方法,它是获取Locality级别对应TaskSetManager等待分配下一个任务的时间,代码如下:
- private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-
- val defaultWait = conf.get("spark.locality.wait", "3s")
-
-
-
-
-
- val localityWaitKey = level match {
- case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
- case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
- case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
- case _ => null
- }
-
- if (localityWaitKey != null) {
- conf.getTimeAsMs(localityWaitKey, defaultWait)
- } else {
- 0L
- }
- }
不同的Locality级别对应取不同的参数。为什么要有这个Locality级别对应TaskSetManager等待分配下一个任务的时间呢?我们先留个小小的疑问。
回到recomputeLocality()方法,接下来便是调用computeValidLocalityLevels()这个方法,计算当前最新的有效的位置策略Level,为什么要再次计算呢?主要就是新的slave节点加入,我们需要重新评估下集群中task位置偏好与当前集群executor、host、rack等整体资源的关系,起到了一个位置策略级别动态调整的一个效果。
然后,便是获得位置策略级别的等待时间localityWaits、设置当前使用的位置策略级别的索引currentLocalityIndex,不再赘述。
好了,第7步就分析完了,有些细节留到以后再归纳整理吧。
接着分析第8步,循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性,也就是对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度。显然,我们需要首先看下resourceOfferSingleTaskSet()这个方法。代码如下:
- private def resourceOfferSingleTaskSet(
- taskSet: TaskSetManager,
- maxLocality: TaskLocality,
- shuffledOffers: Seq[WorkerOffer],
- availableCpus: Array[Int],
- tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
-
-
- var launchedTask = false
-
-
- for (i <- 0 until shuffledOffers.size) {
-
-
- val execId = shuffledOffers(i).executorId
- val host = shuffledOffers(i).host
-
-
-
- if (availableCpus(i) >= CPUS_PER_TASK) {
- try {
-
-
- for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
-
-
-
-
- tasks(i) += task
-
-
-
- val tid = task.taskId
- taskIdToTaskSetManager(tid) = taskSet
- taskIdToExecutorId(tid) = execId
- executorIdToTaskCount(execId) += 1
- executorsByHost(host) += execId
- availableCpus(i) -= CPUS_PER_TASK
-
-
- assert(availableCpus(i) >= 0)
-
-
- launchedTask = true
- }
- } catch {
- case e: TaskNotSerializableException =>
- logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
-
-
- return launchedTask
- }
- }
- }
- return launchedTask
- }
该方法的主体流程如下:
1、标志位launchedTask初始化为false,用它来标记是否有task被成功分配或者launched;
2、循环shuffledOffers,即每个可用executor:
2.1、获取其executorId和host;
2.2、如果executor上可利用cpu数目大于每个task需要的数目,则继续task分配;
2.3、调用TaskSetManager的resourceOffer()方法,处理返回的每个TaskDescription:
2.3.1、分配task成功,将task加入到tasks对应位置(注意,tasks为一个空的,根据shuffledOffers和其可用cores生成的有一定结构的列表);
2.3.2、更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、executorsByHost、availableCpus等数据结构;
2.3.3、确保availableCpus(i)不小于0;
2.3.4、标志位launchedTask设置为true;
3、返回launchedTask。
其他都好说,我们只看下TaskSetManager的resourceOffer()方法。代码如下:
- @throws[TaskNotSerializableException]
- def resourceOffer(
- execId: String,
- host: String,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription] =
- {
- if (!isZombie) {
-
-
- val curTime = clock.getTimeMillis()
-
-
- var allowedLocality = maxLocality
-
-
- if (maxLocality != TaskLocality.NO_PREF) {
-
- allowedLocality = getAllowedLocalityLevel(curTime)
-
-
- if (allowedLocality > maxLocality) {
-
- allowedLocality = maxLocality
- }
- }
-
-
- dequeueTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality, speculative)) => {
-
-
-
- val task = tasks(index)
- val taskId = sched.newTaskId()
-
-
- copiesRunning(index) += 1
- val attemptNum = taskAttempts(index).size
-
-
- val info = new TaskInfo(taskId, index, attemptNum, curTime,
- execId, host, taskLocality, speculative)
-
-
- taskInfos(taskId) = info
-
-
- taskAttempts(index) = info :: taskAttempts(index)
-
-
-
-
- if (maxLocality != TaskLocality.NO_PREF) {
- currentLocalityIndex = getLocalityIndex(taskLocality)
- lastLaunchTime = curTime
- }
-
-
-
- val startTime = clock.getTimeMillis()
-
-
- val serializedTask: ByteBuffer = try {
- Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
- } catch {
-
-
- case NonFatal(e) =>
- val msg = s"Failed to serialize task $taskId, not attempting to retry it."
- logError(msg, e)
- abort(s"$msg Exception during serialization: $e")
- throw new TaskNotSerializableException(e)
- }
- if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
- !emittedTaskSizeWarning) {
- emittedTaskSizeWarning = true
- logWarning(s"Stage ${task.stageId} contains a task of very large size " +
- s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
- s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
- }
-
-
- addRunningTask(taskId)
-
-
-
-
- val taskName = s"task ${info.id} in stage ${taskSet.id}"
- logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
- s"$taskLocality, ${serializedTask.limit} bytes)")
-
-
- sched.dagScheduler.taskStarted(task, info)
-
-
-
- return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
- taskName, index, serializedTask))
- }
- case _ =>
- }
- }
- None
- }
resourceOffer()方法的处理流程大体如下:
1、记录当前时间;
2、 确定可以被允许的位置策略:allowedLocality;
3、出列task,即分配task;
3.1、如果找到对应的task,即task可以被分配:
3.1.1、完成获得taskId、更新copiesRunning、获得attemptNum、创建TaskInfo、更新taskInfos、更新taskAttempts、设置currentLocalityIndex、lastLaunchTime等基础数据结构的更新;
3.1.2、序列化task,得到serializedTask;
3.1.3、添加running task;
3.1.4、调用DagScheduler的taskStarted()方法,标记Task已启动;
3.1.5、返回TaskDescription,其中包含taskId、attemptNumber、execId、index、serializedTask等重要信息,attemptNumber是推测执行原理必须使用的,即拖后腿的任务可以执行多份,谁先完成用谁的结果。
首先说下这个allowedLocality,如果maxLocality不为TaskLocality.NO_PREF,我们需要调用getAllowedLocalityLevel(),传入当前时间,得到allowedLocality,getAllowedLocalityLevel()方法逻辑比较简单,代码如下:
- private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
-
-
- def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
- var indexOffset = pendingTaskIds.size
-
- while (indexOffset > 0) {
-
- indexOffset -= 1
-
-
- val index = pendingTaskIds(indexOffset)
-
-
- if (copiesRunning(index) == 0 && !successful(index)) {
- return true
- } else {
-
-
- pendingTaskIds.remove(indexOffset)
- }
- }
- false
- }
-
-
-
- def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
- val emptyKeys = new ArrayBuffer[String]
-
-
- val hasTasks = pendingTasks.exists {
- case (id: String, tasks: ArrayBuffer[Int]) =>
-
-
- if (tasksNeedToBeScheduledFrom(tasks)) {
- true
- } else {
- emptyKeys += id
- false
- }
- }
-
-
- emptyKeys.foreach(id => pendingTasks.remove(id))
- hasTasks
- }
-
-
- while (currentLocalityIndex < myLocalityLevels.length - 1) {
-
-
-
- val moreTasks = myLocalityLevels(currentLocalityIndex) match {
- case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
- case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
- case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
- case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
- }
- if (!moreTasks) {
-
-
-
-
- lastLaunchTime = curTime
- logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
- s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
-
-
- currentLocalityIndex += 1
- } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
-
-
-
-
- lastLaunchTime += localityWaits(currentLocalityIndex)
-
-
- currentLocalityIndex += 1
- logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
- s"${localityWaits(currentLocalityIndex)}ms")
- } else {
-
-
- return myLocalityLevels(currentLocalityIndex)
- }
- }
-
-
- myLocalityLevels(currentLocalityIndex)
- }
在确定allowedLocality后,我们就需要调用dequeueTask()方法,出列task,进行调度。代码如下:
- private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
- : Option[(Int, TaskLocality.Value, Boolean)] =
- {
-
- for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
- return Some((index, TaskLocality.PROCESS_LOCAL, false))
- }
-
-
- if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
- for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
- return Some((index, TaskLocality.NODE_LOCAL, false))
- }
- }
-
-
- if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
-
- for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
- return Some((index, TaskLocality.PROCESS_LOCAL, false))
- }
- }
-
-
- if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
- for {
- rack <- sched.getRackForHost(host)
- index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
- } {
- return Some((index, TaskLocality.RACK_LOCAL, false))
- }
- }
-
-
- if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
- for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
- return Some((index, TaskLocality.ANY, false))
- }
- }
-
-
-
- dequeueSpeculativeTask(execId, host, maxLocality).map {
- case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
- }
很简单,按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度。最后,如果所有的class都被调度的话,寻找一个speculative task,同MapReduce的推测执行原理的思想。
至此,我们得到了TaskDescription,也就知道了哪个Task需要在哪个节点上执行,而Task调度也就全讲完了。
题外话:
要透彻的、清晰的讲解一个复杂的流程,是很费力的,短短几篇文章也是远远不够的。Task调度这两篇文章,重在叙述一个完整的流程,同时讲解部分细节。在这两篇文章的叙述中,肯定会有很多细节没讲清楚、讲透彻,甚至会有些理解错误的地方,希望高手不吝赐教,以免继续误导大家。
针对部分细节,和对流程的深入理解,我以后还会陆续推出博文,进行详细讲解,并归纳总结,谢谢大家!
博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50699939
Spark源码分析之六:Task调度(二)
标签:
原文地址:http://www.cnblogs.com/jirimutu01/p/5274459.html