码迷,mamicode.com
首页 > 其他好文 > 详细

spark源码解读-SparkContext初始化过程

时间:2018-08-27 21:57:22      阅读:172      评论:0      收藏:0      [点我收藏+]

标签:应用程序   flat   rop   add   div   status   util   executor   support   

sparkcontext是spark应用程序的入口,每个spark应用都会创建sparkcontext,用于连接spark集群来执行计算任务.在sparkcontext初始化过程中会创建SparkEnv,SparkUI,TaskSchedule,DAGSchedule等多个核心类,我们会逐个分析他们.

下面我们看一下sparkcontext的初始化过程,首先判断一些参数,

try {
    _conf = config.clone()
    _conf.validateSettings()
    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }
    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")
    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }
    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }
    // Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten

    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

    _eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }

    _listenerBus = new LiveListenerBus(_conf)
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)

  

 

spark源码解读-SparkContext初始化过程

标签:应用程序   flat   rop   add   div   status   util   executor   support   

原文地址:https://www.cnblogs.com/chengwuyouxin/p/9544046.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!