标签:lin bash add tca com rom orm job executor
1spark-shell →spark-submit→(SparkSubmit)spark-class
2open jvm→thread dump→main:SparkSubmit.main→repl.Main→SparkILoop.process:(initializeSpark→createSparkContext)
3SparkContext:(1Utils.getCallSite, 2markPartiallyConstructed(this,config.getBoolean("spark.driver.allowMultipleContexts", false)), 3)
1 SparkConf->SparkEnv
2 TaskScheduler
3 dagScheduler.start()
4 ui.start()
1 SparkContext create instance sc
2 RDD using sc new RDD
3 apply transformation to RDD get a different RDD
4 action action on RDD call sc.runJob
5 sc.runJob→dagScheduler.runJob→submitJob submitJob create an event JobSubmmitted sending to eventProcessActor
6 eventProcessActor.processEvent
7 submitStage job splited into stages, get the dependencies, the final stage, submit the final stage and run
8 submitMissingTasks
9 TaskScheduler::submitTasks assign tasks to workers
10 TaskSchedulerImpl create backend arroding to the mode, backend receive ReceiveOffers from TaskSchedulerImpl
11 eceiveOffers→executor.launchTask→TaskRunner.run
1hadoopRDD->MappedRDD textFile, RDD.scala
2->FlatMappedRDD flatMap, RDD.scala
3 ->MappedRDD splittedText.map, RDD.scala
4->PairRDDFunctions reduceByKey , in /src/PairRDDFunctions.scala
5->ShuffleRDD
6->MapPartitionsRDD
1 executorMemory conf.getOption().orElse().getorElse()
taskScheduler
heartbeatReceiver
env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
dagScheduler
2 taskScheduler.start()
3
4
1 SchedulerBackend
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) ,AppClientListener ,Logging
1 AppClient
2 maxCores
3 start :(1driverUrl, 2args, 3extraJavaOpts, 4classPathEntries, 5libraryPathEntries)
4 sparkJavaOpts, javaOpts, command, appDesc
5 new AppClient
6 client.start() , waitForRegistration()
!!: the conf of client including
1sc.env.actorSystem,masters,
2appName,maxCores,sc.executorMemory,sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)
,command("org.apache.spark.executor.CoarseGrainedExecutorBackend", sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
,args(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}").
2 start() backend.start(), spark.speculation
sc.env.actorSystem.scheduler.schedule(
SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {checkSpeculatableTasks()}
1 connected to the cluster or temporary disconnection
2 temporary disconnection for master failure
3 unrecoverable Application failure
4 add an Executor
5 remove an Executor
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
1 RegisteredApplication
2 ApplicationRemoved from m, terminate
3 ExecutorAdded
4 ExecutorUpdated wheather executor is complete
5 MasterChanged from new m
6 StopAppClient from AppClient::stop()
1 logInfo
2 val app
ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores), driver就是AppClient的actor
3 registerApplication(app) save app‘s conf to master
4 persistenceEngine.addApplication(app)
5 schedule()
1
→ call or next step :definition or content :()including ()parameter or input tab:explanation &at the same time with ?not sure ??serious problem
→(sth)sth is a condition or parameter !!conclusion or attention
标签:lin bash add tca com rom orm job executor
原文地址:http://www.cnblogs.com/yumanman/p/satyrs.html