码迷,mamicode.com
首页 > 其他好文 > 详细

Spark技术内幕: Task向Executor提交的源码解析

时间:2014-10-27 06:56:06      阅读:249      评论:0      收藏:0      [点我收藏+]

标签:des   style   http   color   io   os   ar   java   for   

从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。

如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。

org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:

  1. 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
  2. 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
  3. 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
  4. 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
  5. 通过TaskScheduler提交TaskSet。
TaskSet 就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个 partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来 说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
TaskSet是一个数据结构,存储了这一组task:
[java] view plaincopybubuko.com,布布扣bubuko.com,布布扣
  1. private[spark] class TaskSet(  
  2.     val tasks: Array[Task[_]],  
  3.     val stageId: Int,  
  4.     val attempt: Int,  
  5.     val priority: Int,  
  6.     val properties: Properties) {  
  7.     val id: String = stageId + "." + attempt  
  8.   
  9.   override def toString: String = "TaskSet " + id  
  10. }  


管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager,TaskSetManager会负责task的失败重试;跟踪每个task的执行状态;处理locality-aware的调用。
详细的调用堆栈如下:
  1. org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  2. org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
  3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
  4. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
  5. org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  6. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
  7. org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
  8. org.apache.spark.executor.Executor#launchTask
 
首先看一下org.apache.spark.executor.Executor#launchTask:
[java] view plaincopybubuko.com,布布扣bubuko.com,布布扣
  1. def launchTask(  
  2.     context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {  
  3.   val tr = new TaskRunner(context, taskId, taskName, serializedTask)  
  4.   runningTasks.put(taskId, tr)  
  5.   threadPool.execute(tr) // 开始在executor中运行  
  6. }  


TaskRunner会从序列化的task中反序列化得到task,这个需要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:
[java] view plaincopybubuko.com,布布扣bubuko.com,布布扣
  1. final def run(attemptId: Long): T = {  
  2.    context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)  
  3.    context.taskMetrics.hostname = Utils.localHostName()  
  4.    taskThread = Thread.currentThread()  
  5.    if (_killed) {  
  6.      kill(interruptThread = false)  
  7.    }  
  8.    runTask(context)  
  9.  }  

对于原来提到的两种Task,即
  1.  org.apache.spark.scheduler.ShuffleMapTask
  2.  org.apache.spark.scheduler.ResultTask
分别实现了不同的runTask:
org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:
[java] view plaincopybubuko.com,布布扣bubuko.com,布布扣
  1. override def runTask(context: TaskContext): U = {  
  2.   // Deserialize the RDD and the func using the broadcast variables.  
  3.   val ser = SparkEnv.get.closureSerializer.newInstance()  
  4.   val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](  
  5.     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
  6.   
  7.   metrics = Some(context.taskMetrics)  
  8.   try {  
  9.     func(context, rdd.iterator(partition, context))  
  10.   } finally {  
  11.     context.markTaskCompleted()  
  12.   }  
  13. }  


而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果,
 
[java] view plaincopybubuko.com,布布扣bubuko.com,布布扣
  1. override def runTask(context: TaskContext): MapStatus = {  
  2.   // Deserialize the RDD using the broadcast variable.  
  3.   val ser = SparkEnv.get.closureSerializer.newInstance()  
  4.   val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
  5.     ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
  6.     //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的  
  7.   
  8.   metrics = Some(context.taskMetrics)  
  9.   var writer: ShuffleWriter[Any, Any] = null  
  10.   try {  
  11.     val manager = SparkEnv.get.shuffleManager  
  12.     writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
  13.     writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者disk  
  14.     return writer.stop(success = true).get  
  15.   } catch {  
  16.     case e: Exception =>  
  17.       if (writer != null) {  
  18.         writer.stop(success = false)  
  19.       }  
  20.       throw e  
  21.   } finally {  
  22.     context.markTaskCompleted()  
  23.   }  


这 两个task都不要按照拓扑顺序调用rdd的compute来完成对partition的计算,不同的是ShuffleMapTask需要shuffle write,以供child stage读取shuffle的结果。 对于这两个task都用到的taskBinary,即为在 org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取 得的。

Spark技术内幕: Task向Executor提交的源码解析

标签:des   style   http   color   io   os   ar   java   for   

原文地址:http://www.cnblogs.com/yido9932/p/4053371.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!