码迷,mamicode.com
首页 > 其他好文 > 详细

Spark Deploy 模块

时间:2015-10-06 14:06:51      阅读:369      评论:0      收藏:0      [点我收藏+]

标签:

Spark Scheduler 模块的文章中,介绍到 Spark 将底层的资源管理和上层的任务调度分离开来,一般而言,底层的资源管理会使用第三方的平台,如 YARN 和 Mesos。为了方便用户测试和使用,Spark 也单独实现了一个简单的资源管理平台,也就是本文介绍的 Deploy 模块。

一些有经验的读者已经使用过该功能。

Spark RPC 的实现

细心的读者在阅读 Scheduler 相关代码时,已经注意到很多地方使用了 RPC 的方式通讯,比如 driver 和 executor 之间传递消息。

在旧版本的 Spark 中,直接使用了 akka.Actor 作为并发通讯的基础。很多模块是继承于 akka.Actor 的。为了剥离对 akka 的依赖性, Spark 抽象出一个独立的模块,org.apache.spark.rpc。里面定义了 RpcEndpoint 和 RpcEndpointRef,与 Actor 和 ActorRef 的意义和作用一模一样。并且该 RPC 模块仅有一个实现 org.apache.spark.rpc.akka。所以其通讯方式依然使用了 akka。优势是接口已经抽象出来,随时可以用其他方案替换 akka。

Spark 的风格似乎就是这样,什么都喜欢自己实现,包括调度、存储、shuffle,和刚推出的 Tungsten 项目(自己管理内存,而非 JVM 托管)。

Deploy 模块的整体架构

deploy 木块主要包括三个模块:master, worker, client。

Master:集群的管理者,接受 worker 的注册,接受 client 提交的 application,调度所有的 application。

Worker:一个 worker 上有多个 ExecutorRunner,这些 executor 是真正运行 task 的地方。worker 启动时,会向 master 注册自己。

Client:向 master 提交和监控 application。

架构与 MapReduce V1 比较相似。

代码详解

启动 master 和 worker

object org.apache.spark.deploy.master.Master 中,有 master 启动的 main 函数:

private[deploy] object Master extends Logging {
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"

  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 和 master RPC
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}

这里最主要的一行是:

    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 的 RPC

Master 继承于 RpcEndpoint,所以这里启动工作,都是在 Master.onStart 中完成,主要是启动了 restful 的 http 服务,用于展示状态。

 

object org.apache.spark.deploy.worker.Worker 中,有 worker 启动的 main 函数:

private[deploy] object Worker extends Logging {
  val SYSTEM_NAME = "sparkWorker"
  val ENDPOINT_NAME = "Worker"

  // 需要传入 master 的 url
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) // 启动 Worker
    rpcEnv
  }
  ...
}

worker 启动方式与 master 非常相似。然后

override def onStart() {
  assert(!registered)
  createWorkDir()  // 创建工作目录
  shuffleService.startIfEnabled() // 启动 shuffle 服务
  webUi = new WorkerWebUI(this, workDir, webUiPort) // 驱动 web 服务
  webUi.bind()
  registerWithMaster()  // 向 master 注册自己

  metricsSystem.registerSource(workerSource) // 这侧 worker 的资源
  metricsSystem.start()
  metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
private def registerWithMaster() {
  registrationRetryTimer match {
    case None =>
      registered = false
      registerMasterFutures = tryRegisterAllMasters()
      connectionAttemptCount = 0
      registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( // 不断向 master 注册,直到成功
        new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(ReregisterWithMaster)
          }
        },
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        TimeUnit.SECONDS))
    ...
  }
}


override def receive: PartialFunction[Any, Unit] = {
  case RegisteredWorker(masterRef, masterWebUiUrl) =>  // master 告知 worker 已经注册成功
    logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
    registered = true
    changeMaster(masterRef, masterWebUiUrl)
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable { // worker 不断向 master 发送心跳
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(SendHeartbeat)
      }
    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
  ...
}

如此,master 和 worker 使用心跳的方式一直保持连接。

client 提交 application

dd

application 结束

ddd

总结

dd

Spark Deploy 模块

标签:

原文地址:http://www.cnblogs.com/keepthinking/p/4857015.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!