码迷,mamicode.com
首页 > 其他好文 > 详细

【Spark】DAGScheduler源码浅析2

时间:2015-07-15 19:19:35      阅读:165      评论:0      收藏:0      [点我收藏+]

标签:spark

引入

上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数。

事件处理

在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理。在Spark 1.0源码中,事件处理通过Actor的方式进行,涉及的DAGEventProcessActor类进行主要的事件处理工作。
可能由于scala不再支持原生actor方式,而将akka actor作为官方标准的原因,在我查看Spark 1.4的源码中,DAGScheduler重新采用eventQueue的方式进行事件处理,为了代码逻辑更加清晰,耦合性更小,1.4的源码中编写了DAGSchedulerEventProcessLoop类进行事件处理。

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

这里DAGSchedulerEventProcessLoop继承了EventLoop类,其中:

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  ......

我们可以看到,DAGScheduler通过向DAGSchedulerEventProcessLoop对象投递event,即向eventQueue发送event,eventThread不断从eventQueue中获取event并调用onReceive函数进行处理。

  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

  ......

JobWaiter

JobWaiter首先实现JobListener的taskSucceeded和jobFailed函数,当DAGScheduler收到tasksuccess或fail的event就会调用相应的函数在tasksuccess会判断当所有task都success时,就表示jobFinished而awaitResult,就是一直等待jobFinished被置位。
可以看到在submitJob函数中创建了JobWaiter实例,作为参数传入的事件实例中,最终在调用handleJobSubmitted函数中,如果发生错误,就会调用JobWaiter的jobFailed函数。

下面是JobWaiter类的代码:

private[spark] class JobWaiter[T](
    dagScheduler: DAGScheduler,
    val jobId: Int,
    totalTasks: Int,
    resultHandler: (Int, T) => Unit)
  extends JobListener {

  private var finishedTasks = 0

  // Is the job as a whole finished (succeeded or failed)?
  @volatile
  private var _jobFinished = totalTasks == 0

  def jobFinished = _jobFinished

  // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero
  // partition RDDs), we set the jobResult directly to JobSucceeded.
  private var jobResult: JobResult = if (jobFinished) JobSucceeded else null

  /**
   * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled
   * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it
   * will fail this job with a SparkException.
   */
  def cancel() {
    dagScheduler.cancelJob(jobId)
  }

  override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
    if (_jobFinished) {
      throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
    }
    resultHandler(index, result.asInstanceOf[T])
    finishedTasks += 1
    if (finishedTasks == totalTasks) {
      _jobFinished = true
      jobResult = JobSucceeded
      this.notifyAll()
    }
  }

  override def jobFailed(exception: Exception): Unit = synchronized {
    _jobFinished = true
    jobResult = JobFailed(exception)
    this.notifyAll()
  }

  def awaitResult(): JobResult = synchronized {
    while (!_jobFinished) {
      this.wait()
    }
    return jobResult
  }
}

小结

这一小节内容介绍了DAGScheduler.scala文件中的几个小细节,下一篇文章我会就DAGScheduler.scala文件中stage划分和依赖性进行分析介绍。

转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

【Spark】DAGScheduler源码浅析2

标签:spark

原文地址:http://blog.csdn.net/jasonding1354/article/details/46896341

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!