标签:des style http color io os ar java for
从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。
如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:
- 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
- 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
- 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
- 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
- 通过TaskScheduler提交TaskSet。
TaskSet
就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个
partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来
说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
TaskSet是一个数据结构,存储了这一组task:
- private[spark] class TaskSet(
- val tasks: Array[Task[_]],
- val stageId: Int,
- val attempt: Int,
- val priority: Int,
- val properties: Properties) {
- val id: String = stageId + "." + attempt
-
- override def toString: String = "TaskSet " + id
- }
管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager,TaskSetManager会负责task的失败重试;跟踪每个task的执行状态;处理locality-aware的调用。
详细的调用堆栈如下:
- org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
- org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
- org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
- org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
- org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
- org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
- org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
- org.apache.spark.executor.Executor#launchTask
首先看一下org.apache.spark.executor.Executor#launchTask:
- def launchTask(
- context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
- val tr = new TaskRunner(context, taskId, taskName, serializedTask)
- runningTasks.put(taskId, tr)
- threadPool.execute(tr)
- }
TaskRunner会从序列化的task中反序列化得到task,这个需要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:
- final def run(attemptId: Long): T = {
- context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
- context.taskMetrics.hostname = Utils.localHostName()
- taskThread = Thread.currentThread()
- if (_killed) {
- kill(interruptThread = false)
- }
- runTask(context)
- }
对于原来提到的两种Task,即
- org.apache.spark.scheduler.ShuffleMapTask
- org.apache.spark.scheduler.ResultTask
分别实现了不同的runTask:
org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:
- override def runTask(context: TaskContext): U = {
-
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
- ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
-
- metrics = Some(context.taskMetrics)
- try {
- func(context, rdd.iterator(partition, context))
- } finally {
- context.markTaskCompleted()
- }
- }
而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果,
- override def runTask(context: TaskContext): MapStatus = {
-
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
- ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
-
-
- metrics = Some(context.taskMetrics)
- var writer: ShuffleWriter[Any, Any] = null
- try {
- val manager = SparkEnv.get.shuffleManager
- writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
- writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
- return writer.stop(success = true).get
- } catch {
- case e: Exception =>
- if (writer != null) {
- writer.stop(success = false)
- }
- throw e
- } finally {
- context.markTaskCompleted()
- }
这
两个task都不要按照拓扑顺序调用rdd的compute来完成对partition的计算,不同的是ShuffleMapTask需要shuffle
write,以供child stage读取shuffle的结果。
对于这两个task都用到的taskBinary,即为在
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取
得的。
Spark技术内幕: Task向Executor提交的源码解析
标签:des style http color io os ar java for
原文地址:http://www.cnblogs.com/yido9932/p/4053371.html