标签:回复消息 vertica 操作 state ret catch erro 时间段 exe
In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env using this configuration:
System property | Meaning |
---|---|
spark.deploy.recoveryMode |
Set to FILESYSTEM to enable single-node recovery mode (default: NONE). |
spark.deploy.recoveryDirectory |
The directory in which Spark will store recovery state, accessible from the Master‘s perspective. |
-Dspark.deploy.recoveryMode=ZOOKEEPER
override def isLeader() { synchronized { // could have lost leadership by now. if (!leaderLatch.hasLeadership) { return } logInfo("We have gained leadership") updateLeadershipStatus(true) } } override def notLeader() { synchronized { // could have gained leadership by now. if (leaderLatch.hasLeadership) { return } logInfo("We have lost leadership") updateLeadershipStatus(false) } }
private def updateLeadershipStatus(isLeader: Boolean) { if (isLeader && status == LeadershipStatus.NOT_LEADER) { status = LeadershipStatus.LEADER masterInstance.electedLeader() } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER masterInstance.revokedLeadership() } }
override def electedLeader() { self.send(ElectedLeader) } override def revokedLeadership() { self.send(RevokedLeadership) }
ls /spark/leader_election [_c_6b494d00-cf91-4d69-8828-942377bafdf1-latch-0000000217, _c_c1f4c10f-9479-4a55-b660-0497c0b54f89-latch-0000000218]
case ElectedLeader => val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) }
ls /spark/master_status [worker_worker-20170329184022-192.168.121.101-55109, worker_worker-20170330035822-192.168.121.102-41307]
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { registerApplication(app) app.state = ApplicationState.UNKNOWN app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } for (driver <- storedDrivers) { // Here we just read in the list of drivers. Any drivers associated with now-lost workers // will be re-launched when we detect that the worker is missing. drivers += driver } for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { registerWorker(worker) worker.state = WorkerState.UNKNOWN worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }
private def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. if (state != RecoveryState.RECOVERING) { return } state = RecoveryState.COMPLETING_RECOVERY // Kill off any workers and apps that didn‘t respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Reschedule drivers which were not claimed by any workers drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") }
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
RecoveryState.ALIVE RecoveryState.RECOVERING RecoveryState.COMPLETING_RECOVERY RecoveryState.STANDBYALIVE ----- 活着
--master spark://raintungmaster:7077,raintungslave1:7077
大数据:Spark Standalone 集群调度(三)多Master节点的可用性
标签:回复消息 vertica 操作 state ret catch erro 时间段 exe
原文地址:http://blog.csdn.net/raintungli/article/details/68941173