标签:kill jmx tracking func gcs short settings drive store
SparkContext可以说是Spark应用的发动机引擎,Spark Drive的初始化围绕这SparkContext的初始化。
sparkcontxt的主要组成部分
下面学习SparkContext的初始化过程
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) }
private[spark] def env: SparkEnv = _env
先是创建createSparkEnv()方法,调用了createDriverEnv()
/* ------------------------------------------------------------------------------------- * | Initialization. This code initializes the context in a manner that is exception-safe. | | All internal fields holding state are initialized here, and any error prompts the | | stop() method to be called. | * ------------------------------------------------------------------------------------- */ private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") value } /** Control our logLevel. This overrides any user-defined log settings. * @param logLevel The desired log level as a string. * Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN */ def setLogLevel(logLevel: String) { // let‘s allow lowercase or mixed case too val upperCased = logLevel.toUpperCase(Locale.ROOT) require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased)) } try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env)
因为SparkEnv的很多组件都向LiveListenerBus的事件总线队列中投递事件,所以首先创建的LiveListenerBus,这个类主要功能如下
SparkUI涉及太多组件,这里暂时不深入剖析,后续单独剖析。下面是创建SparkUI的代码
_statusTracker = new SparkStatusTracker(this, _statusStore) _progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind())
如果是local模式,Driver和executor再同一节点,可以直接使用本地交互。出现异常可以方便知道。
当再生产环境中时,往往Executor和Driver是在不同节点上启动的,因此,Driver为了能够掌控Executor,在Driver中创建了心跳接收器。
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
代码用了SparkEnv的子组件NettyRpcEnv的setupEndpoint()方法,
该方法的作用:是向RpcEnv的Dispatcher注册HeartbeatReceiver,并返回HeartbeatReceiver的NettyRpcEndPointRef的引用。
TaskScheduler负责请求集群管理器给应用程序分配并运行Executor(一级调度)和给任务分配Executor并运行任务(二级调度),可以看作是任务调度的客户端。
DAGScheduler主要在任务正式交给TaskSchedulerImp提交前的准备工作,包括创建Job、将DAG的RDD划分到不同的stage、提交Stage等。
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s // constructor _taskScheduler.start()
createTaskScheduler()方法返回Scheduler和TaskScheduler的对偶(此处补scala的知识),表示SparkContext的_taskScheduler已经有了TAskScheduler的引用,HeartbeatReceiver接收到TaskSchedulerIsSet消息后将获取sparkContext的
_taskScheduler属性设置到自身的scheduler属性中。
/** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don‘t try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: ‘" + master + "‘") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } }
BlockManager是SparkEnv的组件之一,囊括了spark存储体系的所有组件和功能,是存储体系最重要的组件。spark的存储体系后续学习。
_applicationId = _taskScheduler.applicationId()
_env.blockManager.initialize(_applicationId)
spark在监控方面有自己的一套体系,一个系统的监控功能可丰富可测试性、性能优化、运维评估、数据统计等。spark的度量系统使用的是codahale提供的第三方仓库Metrics。
spark的度量系统的三个重要概念:
metricsSystem对Source和Sink进行封装,将Source的数据输出到不同的Sink。
metricsSystem是SparkEnv内部组件之一,是整个spark应用程序的度量系统。
// The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
将系统的ServletContextHandler添加到SparkUI中。
_eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() listenerBus.addToEventLogQueue(logger) Some(logger) } else { None }
ExecutorAllocationManager是基于工作负载动态分配和删除Executor的代理。
它内部会定时根据工作负载计算所需的Executor数量,
如果对Executor需求大于集群管理器申请的数量,那么向集群管理器添加Executor。反之,向集群管理器申请取消部分Executor。
此外它内部还会定时向集群管理器申请一出(杀死)过期了的Executor。
// Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start())
用于清理超出应用范围的RDD、shuffle对应的map任务状态、Shuffle元数据、Broadcast对象及RDD的checkpoint数据
_cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start())
/** Start the cleaner. */ def start(): Unit = { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() periodicGCService.scheduleAtFixedRate(new Runnable { override def run(): Unit = System.gc() }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) }
除了GC的定时器,ContextCleaner的其余工作原理和listenerBus一样(采用监听器模式,由异步线程来处理)。
SparkContext初始化的时候会读取用户指定的Jar文件或者其他文件
_jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten
首先读取的时Jar文件,然后读取用户设置的其他文件。
当用Yarn模式时,_jars是spark.jars和spark.yarn.dist.jars的Jar文件的并集。
其他模式时,只采用spark.jars指定的Jar文件。
def jars: Seq[String] = _jars def files: Seq[String] = _files
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
addJar将Jar文件添加到Driver的RPC环境中。
由于addJar和addFile可能会对应用的环境产生影响,因此在SparkContext初始化的最后对更新环境
postEnvironmentUpdate()
postEnvironmentUpdate() postApplicationStart() // Post init _taskScheduler.postStartHook() // 等待SchedulerBackend准备完成 // 向度量系统注册Source _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn‘t help if the JVM // is killed, though. // 添加SparkContext的关闭钩子 logDebug("Adding shutdown hook") // force eager creation of logger _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") try { stop() } catch { case e: Throwable => logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) } } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } } // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. SparkContext.setActiveContext(this, allowMultipleContexts)
/** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. * * @param value value to broadcast to the Spark nodes * @return `Broadcast` object, a read-only variable cached on each machine */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
实质上是调用了SparkEnv的BroadcastManager的newBroadcast()方法生成广播对象。
用于向LiveListenerBus中提娜佳实现了特质SparkListenerInterface的监听器
/** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. */ @DeveloperApi def addSparkListener(listener: SparkListenerInterface) { listenerBus.addToSharedQueue(listener) }
SparkContext重载了runjob方法。最终都调用下面这个runjob。
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString) } //调用sparkContext之前初始化时创建的DAGScheduler的runJob()方法 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() // 保存检查点 }
给作业中的RDD指定保存检查点的目录,是启用检查点机制的前提。
/** * Set the directory under which RDDs are going to be checkpointed. * @param directory path to the directory where checkpoint files will be stored * (must be HDFS path if running in cluster) */ def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Spark is not running in local mode, therefore the checkpoint directory " + s"must not be on the local filesystem. Directory ‘$directory‘ " + "appears to be on the local filesystem.") }
参考
1.《Spark内核设计的艺术架构设计与实现》
2.Spark2.4.3源码
标签:kill jmx tracking func gcs short settings drive store
原文地址:https://www.cnblogs.com/qinglanmei/p/11209281.html