码迷,mamicode.com
首页 > 其他好文 > 详细

Spark学习之1:Master启动流程

时间:2015-04-20 13:06:17      阅读:177      评论:0      收藏:0      [点我收藏+]

标签:

1. 启动脚本

sbin/start-master.sh

  1. "$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
参数:

(1)SPARK_MASTER_IP

(2)SPARK_MASTER_PORT

(3)SPARK_MASTER_WEBUI_PORT

Master类最终会通过bin/spark-class脚本启动。

其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。

  1. spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.out
  2. spark-xxx-org.apache.spark.deploy.master.Master-1.pid
其中“Master-1”中的“1”就是master编号。

2. Master.main

  1. def main(argStrings: Array[String]) {
  2. SignalLogger.register(log)
  3. val conf = new SparkConf
  4. val args = new MasterArguments(argStrings, conf)
  5. val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
  6. actorSystem.awaitTermination()
  7. }
main函数的职责:

(1)创建MasterArguments对象并初始化其成员;

(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor

2.1. MasterArguments

  1. parse(args.toList)
  2. // This mutates the SparkConf, so all accesses to it must be made after this line
  3. propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

(1)parse方法负责解析启动脚本所带的命令行参数;

(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf

2.2. startSystemAndActor

  1. val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
  2. securityManager = securityMgr)
  3. val actor = actorSystem.actorOf(
  4. Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

(1)通过AkkaUtils.createActorSystem创建ActorSystem对象

(2)创建Master actor并启动

3. Master Actor

3.1. 重要数据成员

  1. val workers = new HashSet[WorkerInfo]
  2. val idToWorker = new HashMap[String, WorkerInfo]
  3. val addressToWorker = new HashMap[Address, WorkerInfo]
  4. val apps = new HashSet[ApplicationInfo]
  5. val idToApp = new HashMap[String, ApplicationInfo]
  6. val actorToApp = new HashMap[ActorRef, ApplicationInfo]
  7. val addressToApp = new HashMap[Address, ApplicationInfo]
  8. val waitingApps = new ArrayBuffer[ApplicationInfo]
  9. val completedApps = new ArrayBuffer[ApplicationInfo]
  10. var nextAppNumber = 0
  11. val appIdToUI = new HashMap[String, SparkUI]
  12. val drivers = new HashSet[DriverInfo]
  13. val completedDrivers = new ArrayBuffer[DriverInfo]
  14. val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling

3.2. Master.preStart

  1. // Listen for remote client disconnection events, since they don‘t go through Akka‘s watch()
  2. context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

监听RemotingLifecycleEvent事件,它一个trait

  1. sealed trait RemotingLifecycleEvent extends Serializable {
  2. def logLevel: Logging.LogLevel
  3. }

Master只处理了DisassociatedEvent消息。

  1. context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。

  1. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  2. case "ZOOKEEPER" =>
  3. logInfo("Persisting recovery state to ZooKeeper")
  4. val zkFactory =
  5. new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
  6. (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  7. case "FILESYSTEM" =>
  8. val fsFactory =
  9. new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
  10. (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  11. case "CUSTOM" =>
  12. val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
  13. val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
  14. .newInstance(conf, SerializationExtension(context.system))
  15. .asInstanceOf[StandaloneRecoveryModeFactory]
  16. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  17. case _ =>
  18. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
  19. }

根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。

假设RECOVERY_MODE值为NONE

(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;

(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息

3.3. Master消息处理

3.3.1. ElectedLeader消息

  1. case ElectedLeader => {
  2. val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
  3. state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
  4. RecoveryState.ALIVE
  5. } else {
  6. RecoveryState.RECOVERING
  7. }
  8. logInfo("I have been elected leader! New state: " + state)
  9. if (state == RecoveryState.RECOVERING) {
  10. beginRecovery(storedApps, storedDrivers, storedWorkers)
  11. recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
  12. CompleteRecovery)
  13. }
  14. }

前面假设RECOVERY_MODE值为NONE所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE

3.3.2. CheckForWorkerTimeOut消息

  1. case CheckForWorkerTimeOut => {
  2. timeOutDeadWorkers()
  3. }

检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。

3.3.3. DisassociatedEvent消息

  1. case DisassociatedEvent(_, address, _) => {
  2. // The disconnected client could‘ve been either a worker or an app; remove whichever it was
  3. logInfo(s"$address got disassociated, removing it.")
  4. addressToWorker.get(address).foreach(removeWorker)
  5. addressToApp.get(address).foreach(finishApplication)
  6. if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
  7. }

3.3.4. RegisterWorker消息

这是WorkerMaster之间的注册消息。

  1. val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
  2. sender, workerUiPort, publicAddress)
  3. if (registerWorker(worker)) {
  4. persistenceEngine.addWorker(worker)
  5. sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
  6. schedule()
  7. }

(1)创建WorkerInfo对象;

(2)调用registerWorker方法,记录Worker信息;

(3)Worker发送RegisteredWorker消息;

(4)调用schedule方法,该方法的职责是为DriverApp分配资源。

3.3.4.1. WorkerInfo

  1. private[spark] class WorkerInfo(
  2. val id: String,
  3. val host: String,
  4. val port: Int,
  5. val cores: Int,
  6. val memory: Int,
  7. val actor: ActorRef,
  8. val webUiPort: Int,
  9. val publicAddress: String)
  10. extends Serializable {
  11. ...
  12. init()
  13. ...
  14. private def init() {
  15. executors = new mutable.HashMap
  16. drivers = new mutable.HashMap
  17. state = WorkerState.ALIVE
  18. coresUsed = 0
  19. memoryUsed = 0
  20. lastHeartbeat = System.currentTimeMillis()
  21. }

创建WorkerInfo对象,并调用init进行初始化。

3.3.4.2. Master.registerWorker

  1. workers.filter { w =>
  2. (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
  3. }.foreach { w =>
  4. workers -= w
  5. }

移除状态位DEADWorkerInfo

  1. val workerAddress = worker.actor.path.address
  2. if (addressToWorker.contains(workerAddress)) {
  3. val oldWorker = addressToWorker(workerAddress)
  4. if (oldWorker.state == WorkerState.UNKNOWN) {
  5. // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
  6. // The old worker must thus be dead, so we will remove it and accept the new worker.
  7. removeWorker(oldWorker)
  8. } else {
  9. logInfo("Attempted to re-register worker at same address: " + workerAddress)
  10. return false
  11. }
  12. }
  13. workers += worker
  14. idToWorker(worker.id) = worker
  15. addressToWorker(workerAddress) = worker

记录WorkInfo信息至workersidToWorkeraddressToWorker

4. 启动结束

到此,启动过程就完成了。

接下来开始等待workerdriver消息请求。

 





Spark学习之1:Master启动流程

标签:

原文地址:http://www.cnblogs.com/linker1119/p/4441118.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!