标签:kill jmx tracking func gcs short settings drive store
SparkContext可以说是Spark应用的发动机引擎,Spark Drive的初始化围绕这SparkContext的初始化。
sparkcontxt的主要组成部分
下面学习SparkContext的初始化过程
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
private[spark] def env: SparkEnv = _env
先是创建createSparkEnv()方法,调用了createDriverEnv()
/* ------------------------------------------------------------------------------------- *
| Initialization. This code initializes the context in a manner that is exception-safe. |
| All internal fields holding state are initialized here, and any error prompts the |
| stop() method to be called. |
* ------------------------------------------------------------------------------------- */
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}
/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
// let‘s allow lowercase or mixed case too
val upperCased = logLevel.toUpperCase(Locale.ROOT)
require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
s"Supplied level $logLevel did not match one of:" +
s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased))
}
try {
_conf = config.clone()
_conf.validateSettings()
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}
_listenerBus = new LiveListenerBus(_conf)
// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
因为SparkEnv的很多组件都向LiveListenerBus的事件总线队列中投递事件,所以首先创建的LiveListenerBus,这个类主要功能如下
SparkUI涉及太多组件,这里暂时不深入剖析,后续单独剖析。下面是创建SparkUI的代码
_statusTracker = new SparkStatusTracker(this, _statusStore)
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
如果是local模式,Driver和executor再同一节点,可以直接使用本地交互。出现异常可以方便知道。
当再生产环境中时,往往Executor和Driver是在不同节点上启动的,因此,Driver为了能够掌控Executor,在Driver中创建了心跳接收器。
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
代码用了SparkEnv的子组件NettyRpcEnv的setupEndpoint()方法,
该方法的作用:是向RpcEnv的Dispatcher注册HeartbeatReceiver,并返回HeartbeatReceiver的NettyRpcEndPointRef的引用。
TaskScheduler负责请求集群管理器给应用程序分配并运行Executor(一级调度)和给任务分配Executor并运行任务(二级调度),可以看作是任务调度的客户端。
DAGScheduler主要在任务正式交给TaskSchedulerImp提交前的准备工作,包括创建Job、将DAG的RDD划分到不同的stage、提交Stage等。
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
// constructor
_taskScheduler.start()
createTaskScheduler()方法返回Scheduler和TaskScheduler的对偶(此处补scala的知识),表示SparkContext的_taskScheduler已经有了TAskScheduler的引用,HeartbeatReceiver接收到TaskSchedulerIsSet消息后将获取sparkContext的
_taskScheduler属性设置到自身的scheduler属性中。
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don‘t try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: ‘" + master + "‘")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
BlockManager是SparkEnv的组件之一,囊括了spark存储体系的所有组件和功能,是存储体系最重要的组件。spark的存储体系后续学习。
_applicationId = _taskScheduler.applicationId()
_env.blockManager.initialize(_applicationId)
spark在监控方面有自己的一套体系,一个系统的监控功能可丰富可测试性、性能优化、运维评估、数据统计等。spark的度量系统使用的是codahale提供的第三方仓库Metrics。
spark的度量系统的三个重要概念:
metricsSystem对Source和Sink进行封装,将Source的数据输出到不同的Sink。
metricsSystem是SparkEnv内部组件之一,是整个spark应用程序的度量系统。
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
将系统的ServletContextHandler添加到SparkUI中。
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
ExecutorAllocationManager是基于工作负载动态分配和删除Executor的代理。
它内部会定时根据工作负载计算所需的Executor数量,
如果对Executor需求大于集群管理器申请的数量,那么向集群管理器添加Executor。反之,向集群管理器申请取消部分Executor。
此外它内部还会定时向集群管理器申请一出(杀死)过期了的Executor。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
用于清理超出应用范围的RDD、shuffle对应的map任务状态、Shuffle元数据、Broadcast对象及RDD的checkpoint数据
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
/** Start the cleaner. */
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
除了GC的定时器,ContextCleaner的其余工作原理和listenerBus一样(采用监听器模式,由异步线程来处理)。
SparkContext初始化的时候会读取用户指定的Jar文件或者其他文件
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
首先读取的时Jar文件,然后读取用户设置的其他文件。
当用Yarn模式时,_jars是spark.jars和spark.yarn.dist.jars的Jar文件的并集。
其他模式时,只采用spark.jars指定的Jar文件。
def jars: Seq[String] = _jars def files: Seq[String] = _files
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
addJar将Jar文件添加到Driver的RPC环境中。
由于addJar和addFile可能会对应用的环境产生影响,因此在SparkContext初始化的最后对更新环境
postEnvironmentUpdate()
postEnvironmentUpdate()
postApplicationStart()
// Post init
_taskScheduler.postStartHook() // 等待SchedulerBackend准备完成
// 向度量系统注册Source
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn‘t help if the JVM
// is killed, though.
// 添加SparkContext的关闭钩子
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
SparkContext.setActiveContext(this, allowMultipleContexts)
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*
* @param value value to broadcast to the Spark nodes
* @return `Broadcast` object, a read-only variable cached on each machine
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
实质上是调用了SparkEnv的BroadcastManager的newBroadcast()方法生成广播对象。
用于向LiveListenerBus中提娜佳实现了特质SparkListenerInterface的监听器
/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addToSharedQueue(listener)
}
SparkContext重载了runjob方法。最终都调用下面这个runjob。
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
}
//调用sparkContext之前初始化时创建的DAGScheduler的runJob()方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint() // 保存检查点
}
给作业中的RDD指定保存检查点的目录,是启用检查点机制的前提。
/**
* Set the directory under which RDDs are going to be checkpointed.
* @param directory path to the directory where checkpoint files will be stored
* (must be HDFS path if running in cluster)
*/
def setCheckpointDir(directory: String) {
// If we are running on a cluster, log a warning if the directory is local.
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
// its own local file system, which is incorrect because the checkpoint files
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory ‘$directory‘ " +
"appears to be on the local filesystem.")
}
参考
1.《Spark内核设计的艺术架构设计与实现》
2.Spark2.4.3源码
标签:kill jmx tracking func gcs short settings drive store
原文地址:https://www.cnblogs.com/qinglanmei/p/11209281.html