标签:spark spark core 源码
实际任务的运行,都是通过Executor类来执行的。这一节,我们只介绍Standalone模式。
源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { SignalLogger.register(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf//创建Executor sparkConf val port = executorConf.getInt("spark.executor.port", 0) //创建akkaRpcEnv,内部包含actorSystem val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf)) //获取driver的ActorRef val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf()//创建driver sparkConf for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { driverConf.set(key, value) } } if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf) } //创建Executor 的sparkEnv,下面分析 val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. val boundPort = env.conf.getInt("spark.executor.port", 0) assert(boundPort != 0) // Start the CoarseGrainedExecutorBackend endpoint. val sparkHostPort = hostname + ":" + boundPort //这里创建Executor 的ActorRef,onStart方法主要是向driver注册Executor,见下面分析 env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) //这个workerWatcher我没看出起什么作用的 workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() } }先介绍createExecutorEnv,这个与driver端的几乎一样,之前已经介绍过了,这里就介绍一下与driver不同的地方
1、mapOutputTracker在Executor端是MapOutputTrackerWorker对象,mapOutputTracker.trackerEndpoint实际引用的是driver的ActorRef。
2、blockManagerMaster在内部保存的也是driver的ActorRef
3、outputCommitCoordinator.coordinatorRef实际包含的也是driver的ActorRef
现在介绍一下CoarseGrainedExecutorBackend的onStart方法,看它主动干了什么事。
发送RegisterExecutor消息到driver端,注册Executor。成功返回后再向自己发送RegisteredExecutor消息
override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[RegisteredExecutor.type]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor } case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e) }(ThreadUtils.sameThread) }看driver端接收到后如何处理?重点看最后的makeOffers。当由Executor注册上来之后,如果有等待执行的任务,这时就可以开始了。这个方法后续还会用到,且目前还没讲到任务调度的章节,后续再解释。这里只需要知道,Executor注册上来之后,会触发一把任务调度(如果有任务的话)
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) } else { logInfo("Registered executor: " + executorRef + " with ID " + executorId) context.reply(RegisteredExecutor)//反馈RegisteredExecutor消息到Executor addressToExecutorId(executorRef.address) = executorId totalCoreCount.addAndGet(cores)//每注册成功一个Executor,就记录总的cores totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() }Executor端接收到之后,创建真正的Executor对象,Executor类是运行任务的接口,里面维护着该Executor进程上的所有任务case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)至此,Executor端的注册逻辑就介绍完了,后续将结合真正的任务介绍其他的内容。
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:spark spark core 源码
原文地址:http://blog.csdn.net/yueqian_zhu/article/details/48010137