标签:
Spark Scheduler 模块的文章中,介绍到 Spark 将底层的资源管理和上层的任务调度分离开来,一般而言,底层的资源管理会使用第三方的平台,如 YARN 和 Mesos。为了方便用户测试和使用,Spark 也单独实现了一个简单的资源管理平台,也就是本文介绍的 Deploy 模块。
一些有经验的读者已经使用过该功能。
Spark RPC 的实现
细心的读者在阅读 Scheduler 相关代码时,已经注意到很多地方使用了 RPC 的方式通讯,比如 driver 和 executor 之间传递消息。
在旧版本的 Spark 中,直接使用了 akka.Actor 作为并发通讯的基础。很多模块是继承于 akka.Actor 的。为了剥离对 akka 的依赖性, Spark 抽象出一个独立的模块,org.apache.spark.rpc。里面定义了 RpcEndpoint 和 RpcEndpointRef,与 Actor 和 ActorRef 的意义和作用一模一样。并且该 RPC 模块仅有一个实现 org.apache.spark.rpc.akka。所以其通讯方式依然使用了 akka。优势是接口已经抽象出来,随时可以用其他方案替换 akka。
Spark 的风格似乎就是这样,什么都喜欢自己实现,包括调度、存储、shuffle,和刚推出的 Tungsten 项目(自己管理内存,而非 JVM 托管)。
Deploy 模块的整体架构
deploy 木块主要包括三个模块:master, worker, client。
Master:集群的管理者,接受 worker 的注册,接受 client 提交的 application,调度所有的 application。
Worker:一个 worker 上有多个 ExecutorRunner,这些 executor 是真正运行 task 的地方。worker 启动时,会向 master 注册自己。
Client:向 master 提交和监控 application。
架构与 MapReduce V1 比较相似。
代码详解
启动 master 和 worker
object org.apache.spark.deploy.master.Master 中,有 master 启动的 main 函数:
private[deploy] object Master extends Logging { val SYSTEM_NAME = "sparkMaster" val ENDPOINT_NAME = "Master" def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() } def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 和 master RPC val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) } }
这里最主要的一行是:
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 启动 Master 的 RPC
Master 继承于 RpcEndpoint,所以这里启动工作,都是在 Master.onStart 中完成,主要是启动了 restful 的 http 服务,用于展示状态。
object org.apache.spark.deploy.worker.Worker 中,有 worker 启动的 main 函数:
private[deploy] object Worker extends Logging { val SYSTEM_NAME = "sparkWorker" val ENDPOINT_NAME = "Worker" // 需要传入 master 的 url def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) rpcEnv.awaitTermination() } def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) // 启动 Worker rpcEnv } ... }
worker 启动方式与 master 非常相似。然后
override def onStart() { assert(!registered) createWorkDir() // 创建工作目录 shuffleService.startIfEnabled() // 启动 shuffle 服务 webUi = new WorkerWebUI(this, workDir, webUiPort) // 驱动 web 服务 webUi.bind() registerWithMaster() // 向 master 注册自己 metricsSystem.registerSource(workerSource) // 这侧 worker 的资源 metricsSystem.start() metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
private def registerWithMaster() { registrationRetryTimer match { case None => registered = false registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( // 不断向 master 注册,直到成功 new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) ... } } override def receive: PartialFunction[Any, Unit] = { case RegisteredWorker(masterRef, masterWebUiUrl) => // master 告知 worker 已经注册成功 logInfo("Successfully registered with master " + masterRef.address.toSparkURL) registered = true changeMaster(masterRef, masterWebUiUrl) forwordMessageScheduler.scheduleAtFixedRate(new Runnable { // worker 不断向 master 发送心跳 override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) ... }
如此,master 和 worker 使用心跳的方式一直保持连接。
client 提交 application
dd
application 结束
ddd
总结
dd
标签:
原文地址:http://www.cnblogs.com/keepthinking/p/4857015.html