标签:
以wordcount为示例进行深入分析
| 1 | object wordcount { |
| 2 | |
| 3 | def main(args: Array[String]) { |
| 4 | val conf = new SparkConf() |
| 5 | conf.setAppName("wordcount").setMaster("local") |
| 6 | |
| 7 | val sc = new SparkContext(conf) |
| 8 | // 产生HadoopRDD->MapPartitionsRDD |
| 9 | val lines = sc.textFile("C://Users//Administrator//Desktop//wordcount.txt", 1) |
| 10 | // 产生FlatMappedRDD |
| 11 | val words = lines.flatMap(line=>line.split(" ")) |
| 12 | // 产生MapPartitionsRDD |
| 13 | val pairs = words.map(word=>(word,1)) |
| 14 | //产生MapPartitionsRDD -> ShuffleRDD -> MapPartitionsRDD, 产生三个RDD |
| 15 | val result= pairs.reduceByKey(_ + _); |
| 16 | // foreach为action操作,通过SparkContext的runJob方法去触发job(DAGScheduler) |
| 17 | result.foreach(count=>println(count)) |
| 18 | } |
| 19 | } |
说明:
1、textFile方法的实现内部先通过hadoopFile创建HadoopRDD(key-value对格式,key为文本文件的每一行偏移量,value为每行的内容),再转换为MapPartitionsRDD(每个集合元素只包含每行的内容)
2、RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。
stage划分算法说明
从触发action操作的rdd开始往前倒推,首先会为最后一个rdd创建一个stage,继续往前倒退的时候,如果发现对某个 rdd是宽依赖,那么就会将该宽依赖的rdd创建一个新的stage,之前面的那个rdd就是新的stage的最后一个rdd。然后以次类推,继续往前倒退,根据窄依赖和宽依赖进行stage的划分,知道所有的rdd全部遍历完成。
划分stage的作用
在spark中提交的应用都会以job的形式进行执行,job提交后会被划分为多个stage,然后把stage封装为TaskSet提交到TaskScheduler到executor中执行。
源码分析
以上wordcount程序action操作后执行流程:
foreach(RDD.scala) -> runJob(SparkContext.scala) -> runJob(DAGScheduler.scala) -> submitJob(DAGScheduler.scala) -> eventProcessLoop.post发送JobSubmitted(DAGScheduler.scala) -> onReceive(DAGScheduler.scala)->case JobSubmitted -> handleJobSubmitted (入口)
DAGScheduler实现类所属包:org.apache.spark.scheduler
handleJobSubmitted
功能:stage的依赖分析及生成stage和对应的Job提交
| 1 | private[scheduler] def handleJobSubmitted(jobId: Int, |
| 2 | finalRDD: RDD[_], |
| 3 | func: (TaskContext, Iterator[_]) => _, |
| 4 | partitions: Array[Int], |
| 5 | allowLocal: Boolean, |
| 6 | callSite: CallSite, |
| 7 | listener: JobListener, |
| 8 | properties: Properties = null) |
| 9 | { |
| 10 | var finalStage: Stage = null |
| 11 | try { |
| 12 | // New stage creation may throw an exception if, for example, jobs are run on a |
| 13 | // HadoopRDD whose underlying HDFS files have been deleted. |
| 14 | // 使用job的最后一个rdd创建finalStage,并加入到DAGScheduler内部缓存中(stageIdToStage) |
| 15 | finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite) |
| 16 | } catch { |
| 17 | case e: Exception => |
| 18 | logWarning("Creating new stage failed due to exception - job: " + jobId, e) |
| 19 | listener.jobFailed(e) |
| 20 | return |
| 21 | } |
| 22 | if (finalStage != null) { |
| 23 | // 使用finalStage创建一个Job,也就是该Job的最后一个stage |
| 24 | val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) |
| 25 | clearCacheLocs() |
| 26 | logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( |
| 27 | job.jobId, callSite.shortForm, partitions.length, allowLocal)) |
| 28 | logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") |
| 29 | logInfo("Parents of final stage: " + finalStage.parents) |
| 30 | logInfo("Missing parents: " + getMissingParentStages(finalStage)) |
| 31 | val shouldRunLocally = |
| 32 | localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 |
| 33 | val jobSubmissionTime = clock.getTimeMillis() |
| 34 | // 对于没有父stage的job 本地执行 |
| 35 | if (shouldRunLocally) { |
| 36 | // Compute very short actions like first() or take() with no parent stages locally. |
| 37 | listenerBus.post( |
| 38 | SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) |
| 39 | // 本地执行Job |
| 40 | runLocally(job) |
| 41 | } else { |
| 42 | // 将Job加入内存缓存中 |
| 43 | jobIdToActiveJob(jobId) = job |
| 44 | activeJobs += job |
| 45 | finalStage.resultOfJob = Some(job) |
| 46 | val stageIds = jobIdToStageIds(jobId).toArray |
| 47 | val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) |
| 48 | listenerBus.post( |
| 49 | SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) |
| 50 | // 提交stage,所有的stage都放入waitingStages队列里 |
| 51 | submitStage(finalStage) |
| 52 | } |
| 53 | } |
| 54 | submitWaitingStages() |
| 55 | } |
| 1 | private def submitStage(stage: Stage) { |
| 2 | val jobId = activeJobForStage(stage) |
| 3 | if (jobId.isDefined) { |
| 4 | logDebug("submitStage(" + stage + ")") |
| 5 | if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { |
| 6 | //获取当前stage的父stage |
| 7 | val missing = getMissingParentStages(stage).sortBy(_.id) |
| 8 | logDebug("missing: " + missing) |
| 9 | if (missing == Nil) { |
| 10 | logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") |
| 11 | // 为stage创建task,且task数据与partition数量相同 |
| 12 | submitMissingTasks(stage, jobId.get) |
| 13 | } else { |
| 14 | // 提交父stage |
| 15 | for (parent <- missing) { |
| 16 | submitStage(parent) |
| 17 | } |
| 18 | // 将stage加入waitingStages缓存中 |
| 19 | waitingStages += stage |
| 20 | } |
| 21 | } |
| 22 | } else { |
| 23 | abortStage(stage, "No active job for stage " + stage.id) |
| 24 | } |
| 25 | } |
| 1 | // stage划分算法的具体实现 |
| 2 | // 对于一个stage如果它的最后一个rdd的所有依赖都是窄依赖,那么不会创建新的stage, |
| 3 | // 但如果存在宽依赖,就用宽依赖的那个rdd创建一个新的stage并返回 |
| 4 | private def getMissingParentStages(stage: Stage): List[Stage] = { |
| 5 | val missing = new HashSet[Stage] |
| 6 | val visited = new HashSet[RDD[_]] |
| 7 | // We are manually maintaining a stack here to prevent StackOverflowError |
| 8 | // caused by recursively visiting |
| 9 | val waitingForVisit = new Stack[RDD[_]] |
| 10 | def visit(rdd: RDD[_]) { |
| 11 | if (!visited(rdd)) { |
| 12 | visited += rdd |
| 13 | if (getCacheLocs(rdd).contains(Nil)) { |
| 14 | // 遍历RDD |
| 15 | for (dep <- rdd.dependencies) { |
| 16 | dep match { |
| 17 | // 宽依赖处理 |
| 18 | case shufDep: ShuffleDependency[_, _, _] => |
| 19 | // 创建stage,并将isShuffleMap设置为true |
| 20 | val mapStage = getShuffleMapStage(shufDep, stage.jobId) |
| 21 | if (!mapStage.isAvailable) { |
| 22 | // 将新创建的stage缓存到missing中 |
| 23 | missing += mapStage |
| 24 | } |
| 25 | // 窄依赖处理 |
| 26 | case narrowDep: NarrowDependency[_] => |
| 27 | // 将依赖的rdd放入栈中 |
| 28 | waitingForVisit.push(narrowDep.rdd) |
| 29 | } |
| 30 | } |
| 31 | } |
| 32 | } |
| 33 | } |
| 34 | // 向waitingForVisit栈中压rdd |
| 35 | waitingForVisit.push(stage.rdd) |
| 36 | while (!waitingForVisit.isEmpty) { |
| 37 | visit(waitingForVisit.pop()) |
| 38 | } |
| 39 | // 返回stage列表 |
| 40 | missing.toList |
| 41 | } |
| 1 | // 为stage创建一批task,且task数量与partition数量相同 |
| 2 | private def submitMissingTasks(stage: Stage, jobId: Int) { |
| 3 | logDebug("submitMissingTasks(" + stage + ")") |
| 4 | // Get our pending tasks and remember them in our pendingTasks entry |
| 5 | stage.pendingTasks.clear() |
| 6 | |
| 7 | // First figure out the indexes of partition ids to compute. |
| 8 | // 获取需要创建的partition数量 |
| 9 | val partitionsToCompute: Seq[Int] = { |
| 10 | if (stage.isShuffleMap) { |
| 11 | (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil) |
| 12 | } else { |
| 13 | val job = stage.resultOfJob.get |
| 14 | (0 until job.numPartitions).filter(id => !job.finished(id)) |
| 15 | } |
| 16 | } |
| 17 | |
| 18 | ................................ |
| 19 | |
| 20 | // 将stae加入到runningStages缓存中 |
| 21 | runningStages += stage |
| 22 | |
| 23 | ................................ |
| 24 | |
| 25 | // 为stage创建指定数量的task,并计算最佳位置 |
| 26 | val tasks: Seq[Task[_]] = if (stage.isShuffleMap) { |
| 27 | partitionsToCompute.map { id => |
| 28 | // 计算最佳位置 |
| 29 | val locs = getPreferredLocs(stage.rdd, id) |
| 30 | val part = stage.rdd.partitions(id) |
| 31 | // 创建ShuffleMapTask |
| 32 | new ShuffleMapTask(stage.id, taskBinary, part, locs) |
| 33 | } |
| 34 | } else { |
| 35 | val job = stage.resultOfJob.get |
| 36 | partitionsToCompute.map { id => |
| 37 | val p: Int = job.partitions(id) |
| 38 | val part = stage.rdd.partitions(p) |
| 39 | val locs = getPreferredLocs(stage.rdd, p) |
| 40 | // 给final stage创建ResultTask |
| 41 | new ResultTask(stage.id, taskBinary, part, locs, id) |
| 42 | } |
| 43 | } |
| 44 | |
| 45 | if (tasks.size > 0) { |
| 46 | logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") |
| 47 | stage.pendingTasks ++= tasks |
| 48 | logDebug("New pending tasks: " + stage.pendingTasks) |
| 49 | // 对stage的task创建TaskSet对象,调用TaskScheduler的submitTasks()方法提交TaskSet |
| 50 | taskScheduler.submitTasks( |
| 51 | new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) |
| 52 | stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) |
| 53 | } |
| 54 | |
| 55 | ...................... |
| 56 | } |
getPreferredLocsInternal
功能:
计算每个task对应的partition最佳位置,从stage的最后一个rdd开始查找,看rdd的partition是否有被cache、chencjpoint,如果有那么task的最佳位置就被cache或者checkpoint的partition的位置
调用过程:
submitMissingTasks->getPreferredLocs->getPreferredLocsInternal
| 1 | // 计算每个task对应的partition最佳位置 |
| 2 | // 从stage的最后一个rdd开始查找,看rdd的partition是否有被cache、chencjpoint, |
| 3 | // 如果有那么task的最佳位置就被cache或者checkpoint的partition的位置 |
| 4 | private def getPreferredLocsInternal( |
| 5 | rdd: RDD[_], |
| 6 | partition: Int, |
| 7 | visited: HashSet[(RDD[_],Int)]) |
| 8 | : Seq[TaskLocation] = |
| 9 | { |
| 10 | // If the partition has already been visited, no need to re-visit. |
| 11 | // This avoids exponential path exploration. SPARK-695 |
| 12 | if (!visited.add((rdd,partition))) { |
| 13 | // Nil has already been returned for previously visited partitions. |
| 14 | return Nil |
| 15 | } |
| 16 | // If the partition is cached, return the cache locations |
| 17 | // 寻找rdd是否被缓存 |
| 18 | val cached = getCacheLocs(rdd)(partition) |
| 19 | if (!cached.isEmpty) { |
| 20 | return cached |
| 21 | } |
| 22 | // If the RDD has some placement preferences (as is the case for input RDDs), get those |
| 23 | // 寻找当前RDD是否被cachepoint |
| 24 | val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList |
| 25 | if (!rddPrefs.isEmpty) { |
| 26 | return rddPrefs.map(TaskLocation(_)) |
| 27 | } |
| 28 | // If the RDD has narrow dependencies, pick the first partition of the first narrow dep |
| 29 | // that has any placement preferences. Ideally we would choose based on transfer sizes, |
| 30 | // but this will do for now. |
| 31 | // 递归调用自己寻找rdd的父rdd,检查对应的partition是否被缓存或者checkpoint |
| 32 | rdd.dependencies.foreach { |
| 33 | case n: NarrowDependency[_] => |
| 34 | for (inPart <- n.getParents(partition)) { |
| 35 | val locs = getPreferredLocsInternal(n.rdd, inPart, visited) |
| 36 | if (locs != Nil) { |
| 37 | return locs |
| 38 | } |
| 39 | } |
| 40 | case _ => |
| 41 | } |
| 42 | // 如果stage从最后一个rdd到最开始的rdd,partiton都没有被缓存或者cachepoint, |
| 43 | // 那么task的最佳位置(preferredLocs)为Nil |
| 44 | Nil |
| 45 | } |
标签:
原文地址:http://www.cnblogs.com/jianyuan/p/Spark系列之DAGScheduler工作原理.html