标签:stage划分算法
stage划分算法总结
最后一个RDD创建finalstage
finalstage倒推
通过宽依赖,来进行新的stage划分
使用递归,依次提交stage,从父stage开始
源码 org.apache.spark.scheduler包下
stage划分算法由 submitStage和getMissingParentStages方法组成
第一步:使用触发job的最后一个RDD,创建finalstage,传入到newstage方法中
var finalStage: Stage = null
//创建一个stage对象,并且将stage加入到DAGscheduler中
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
第二步:用finalstage创建一个job,也就是说,这个job的最后一个stage,当然就是finalstage
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
第三步:将job加入到内存缓存中
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
第四步:使用submitStage方法提交finalstage(尝试)
submitStage(finalStage)
//调用getMissingParentStages方法,去获取当前stage的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
//首先往栈中,推入了最后一个RDD
waitingForVisit.push(stage.rdd)
//然后进行while循环,调用自己内部定义的visit()方法
while (!waitingForVisit.isEmpty) {
visit(waitingForVisit.pop())
}
在visit()方法内,遍历RDD的依赖
for (dep <- rdd.dependencies)
如果是窄依赖,那么将依赖的RDD放入栈中
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
如果是宽依赖,那么使用依赖的RDD创建一个新的stage,并且会将isShuffleMap设置为true
(默认的最后一个stage不是shuffleMap stage)
除了finalstage都是shuffleMap stage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (missing == Nil) {
//如果吗没有父stage则执行
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
//递归调用submit方法去去提交父stage
for (parent <- missing) {
submitStage(parent)
}
//并且将当前stage放入等待执行的stage队列中
waitingStages += stage
}
/*
* 提交stage的方法
*/
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//调用getMissingParentStages方法,去获取当前stage的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id)
}
}
/*
* 获取某个stage的父stage方法
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
//遍历RDD的父依赖
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
//首先往栈中,推入了最后一个RDD
waitingForVisit.push(stage.rdd)
//然后进行循环,调用自己内部定义的visit()方法
while (!waitingForVisit.isEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
标签:stage划分算法
原文地址:http://beyond520.blog.51cto.com/10540356/1922409