Spark on K8S源码解析
sparkk8s
time: 2019-12-19
Spark on k8s源码解析
本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程.
spark on k8s的代码位置位于:

关于kubernetes目录由以下部分组成:
- $ tree kubernetes -L 1  
 
 
- kubernetes 
 
 
- ├── core 
 
 
- ├── docker 
 
 
- └── integration-tests 
 
 
-  
 
 
其中kubernetes中的core/src/main的代码目录如下:
- $ tree core/src/main/scala -L 7 
 
 
- core/src/main/scala 
 
 
- └── org 
 
 
-     └── apache 
 
 
-         └── spark 
 
 
-             ├── deploy 
 
 
-             │   └── k8s 
 
 
-             │       ├── Config.scala 
 
 
-             │       ├── Constants.scala 
 
 
-             │       ├── features 
 
 
-             │       │   ├── BasicDriverFeatureStep.scala 
 
 
-             │       │   ├── BasicExecutorFeatureStep.scala 
 
 
-             │       │   ├── DriverCommandFeatureStep.scala 
 
 
-             │       │   ├── DriverKubernetesCredentialsFeatureStep.scala 
 
 
-             │       │   ├── DriverServiceFeatureStep.scala 
 
 
-             │       │   ├── EnvSecretsFeatureStep.scala 
 
 
-             │       │   ├── ExecutorKubernetesCredentialsFeatureStep.scala 
 
 
-             │       │   ├── HadoopConfDriverFeatureStep.scala 
 
 
-             │       │   ├── KerberosConfDriverFeatureStep.scala 
 
 
-             │       │   ├── KubernetesFeatureConfigStep.scala 
 
 
-             │       │   ├── LocalDirsFeatureStep.scala 
 
 
-             │       │   ├── MountSecretsFeatureStep.scala 
 
 
-             │       │   ├── MountVolumesFeatureStep.scala 
 
 
-             │       │   └── PodTemplateConfigMapStep.scala 
 
 
-             │       ├── KubernetesConf.scala 
 
 
-             │       ├── KubernetesDriverSpec.scala 
 
 
-             │       ├── KubernetesUtils.scala 
 
 
-             │       ├── KubernetesVolumeSpec.scala 
 
 
-             │       ├── KubernetesVolumeUtils.scala 
 
 
-             │       ├── SparkKubernetesClientFactory.scala 
 
 
-             │       ├── SparkPod.scala 
 
 
-             │       └── submit 
 
 
-             │           ├── K8sSubmitOps.scala 
 
 
-             │           ├── KubernetesClientApplication.scala 
 
 
-             │           ├── KubernetesDriverBuilder.scala 
 
 
-             │           ├── LoggingPodStatusWatcher.scala 
 
 
-             │           └── MainAppResource.scala 
 
 
-             └── scheduler 
 
 
-                 └── cluster 
 
 
-                     └── k8s 
 
 
-                         ├── ExecutorPodsAllocator.scala 
 
 
-                         ├── ExecutorPodsLifecycleManager.scala 
 
 
-                         ├── ExecutorPodsPollingSnapshotSource.scala 
 
 
-                         ├── ExecutorPodsSnapshot.scala 
 
 
-                         ├── ExecutorPodsSnapshotsStoreImpl.scala 
 
 
-                         ├── ExecutorPodsSnapshotsStore.scala 
 
 
-                         ├── ExecutorPodStates.scala 
 
 
-                         ├── ExecutorPodsWatchSnapshotSource.scala 
 
 
-                         ├── KubernetesClusterManager.scala 
 
 
-                         ├── KubernetesClusterSchedulerBackend.scala 
 
 
-                         └── KubernetesExecutorBuilder.scala 
 
 
-  
 
 
- 10 directories, 39 files 
 
 
而docker目录下面则是用来打包Spark镜像的Dockerfile:
- $ tree kubernetes/docker/src/main -L 5 
 
 
- kubernetes/docker/src/main 
 
 
- └── dockerfiles 
 
 
-     └── spark 
 
 
-         ├── bindings 
 
 
-         │   ├── python 
 
 
-         │   │   └── Dockerfile 
 
 
-         │   └── R 
 
 
-         │       └── Dockerfile 
 
 
-         ├── Dockerfile 
 
 
-         └── entrypoint.sh 
 
 
-  
 
 
- 5 directories, 4 files 
 
 
-  
 
 
1. Spark Submit
首先我们提交一个spark-pi的例子作为开始:
- $ ./bin/spark-submit \ 
 
 
-     --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \ 
 
 
-     --deploy-mode cluster \ 
 
 
-     --name spark-pi \ 
 
 
-     --class org.apache.spark.examples.SparkPi \ 
 
 
-     --conf spark.executor.instances=5 \ 
 
 
-     --conf spark.kubernetes.container.image=<spark-image> \ 
 
 
-     local:///path/to/examples.jar 
 
 
- if [ -z "${SPARK_HOME}" ]; then 
 
 
-   source "$(dirname "$0")"/find-spark-home 
 
 
- fi 
 
 
-  
 
 
- # disable randomized hash for string in Python 3.3+ 
 
 
- export PYTHONHASHSEED=0 
 
 
- # 源码批注: 这里将spark-submit中的所有入参都传递给spark-class  
 
 
- exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" 
 
 
这个脚本中核心功能见该代码的71-74行.
主要功能:
根据环境和spark-submit的入参去拼接
- java -Xmx128m *** -cp *** com.demo.Main ***.jar 
 
 
而进入的Main就是org.apache.spark.deploy.SparkSubmit
- #!/usr/bin/env bash 
 
 
-  
 
 
- # 
 
 
- # Licensed to the Apache Software Foundation (ASF) under one or more 
 
 
- # contributor license agreements.  See the NOTICE file distributed with 
 
 
- # this work for additional information regarding copyright ownership. 
 
 
- # The ASF licenses this file to You under the Apache License, Version 2.0 
 
 
- # (the "License"); you may not use this file except in compliance with 
 
 
- # the License.  You may obtain a copy of the License at 
 
 
- # 
 
 
- #    http://www.apache.org/licenses/LICENSE-2.0 
 
 
- # 
 
 
- # Unless required by applicable law or agreed to in writing, software 
 
 
- # distributed under the License is distributed on an "AS IS" BASIS, 
 
 
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 
 
- # See the License for the specific language governing permissions and 
 
 
- # limitations under the License. 
 
 
- # 
 
 
-  
 
 
- if [ -z "${SPARK_HOME}" ]; then 
 
 
-   source "$(dirname "$0")"/find-spark-home 
 
 
- fi 
 
 
-  
 
 
- . "${SPARK_HOME}"/bin/load-spark-env.sh 
 
 
-  
 
 
- # Find the java binary 
 
 
- if [ -n "${JAVA_HOME}" ]; then 
 
 
-   RUNNER="${JAVA_HOME}/bin/java" 
 
 
- else 
 
 
-   if [ "$(command -v java)" ]; then 
 
 
-     RUNNER="java" 
 
 
-   else 
 
 
-     echo "JAVA_HOME is not set" >&2 
 
 
-     exit 1 
 
 
-   fi 
 
 
- fi 
 
 
-  
 
 
- # Find Spark jars. 
 
 
- if [ -d "${SPARK_HOME}/jars" ]; then 
 
 
-   SPARK_JARS_DIR="${SPARK_HOME}/jars" 
 
 
- else 
 
 
-   SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" 
 
 
- fi 
 
 
-  
 
 
- if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then 
 
 
-   echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 
 
 
-   echo "You need to build Spark with the target \"package\" before running this program." 1>&2 
 
 
-   exit 1 
 
 
- else 
 
 
-   LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" 
 
 
- fi 
 
 
-  
 
 
- # Add the launcher build dir to the classpath if requested. 
 
 
- if [ -n "$SPARK_PREPEND_CLASSES" ]; then 
 
 
-   LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" 
 
 
- fi 
 
 
-  
 
 
- # For tests 
 
 
- if [[ -n "$SPARK_TESTING" ]]; then 
 
 
-   unset YARN_CONF_DIR 
 
 
-   unset HADOOP_CONF_DIR 
 
 
- fi 
 
 
-  
 
 
- # The launcher library will print arguments separated by a NULL character, to allow arguments with 
 
 
- # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating 
 
 
- # an array that will be used to exec the final command. 
 
 
- # 
 
 
- # The exit code of the launcher is appended to the output, so the parent shell removes it from the 
 
 
- # command array and checks the value to see if the launcher succeeded. 
 
 
-  
 
 
- build_command() { 
 
 
-   "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" 
 
 
-   printf "%d\0" $? 
 
 
- } 
 
 
-  
 
 
- # Turn off posix mode since it does not allow process substitution 
 
 
- set +o posix 
 
 
- CMD=() 
 
 
- DELIM=$‘\n‘ 
 
 
- CMD_START_FLAG="false" 
 
 
- while IFS= read -d "$DELIM" -r ARG; do 
 
 
-   if [ "$CMD_START_FLAG" == "true" ]; then 
 
 
-     CMD+=("$ARG") 
 
 
-   else 
 
 
-     if [ "$ARG" == $‘\0‘ ]; then 
 
 
-       # After NULL character is consumed, change the delimiter and consume command string. 
 
 
-       DELIM=‘‘ 
 
 
-       CMD_START_FLAG="true" 
 
 
-     elif [ "$ARG" != "" ]; then 
 
 
-       echo "$ARG" 
 
 
-     fi 
 
 
-   fi 
 
 
- done < <(build_command "$@") 
 
 
-  
 
 
- COUNT=${#CMD[@]} 
 
 
- LAST=$((COUNT - 1)) 
 
 
- LAUNCHER_EXIT_CODE=${CMD[$LAST]} 
 
 
-  
 
 
- # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes 
 
 
- # the code that parses the output of the launcher to get confused. In those cases, check if the 
 
 
- # exit code is an integer, and if it‘s not, handle it as a special error case. 
 
 
- if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then 
 
 
-   echo "${CMD[@]}" | head -n-1 1>&2 
 
 
-   exit 1 
 
 
- fi 
 
 
-  
 
 
- if [ $LAUNCHER_EXIT_CODE != 0 ]; then 
 
 
-   exit $LAUNCHER_EXIT_CODE 
 
 
- fi 
 
 
-  
 
 
- CMD=("${CMD[@]:0:$LAST}") 
 
 
- exec "${CMD[@]}" 
 
 
-  
 
 
SparkSubmit
通过java命令启动,首先进入 Object SparkSubmit的main方法:
- 构造SparkSubmit对象
- 执行SparkSubmit.doSubmit方法.
-   override def main(args: Array[String]): Unit = { 
 
 
-     val submit = new SparkSubmit() { 
 
 
-       self => 
 
 
-  
 
 
-       override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { 
 
 
-         new SparkSubmitArguments(args) { 
 
 
-           override protected def logInfo(msg: => String): Unit = self.logInfo(msg) 
 
 
-  
 
 
-           override protected def logWarning(msg: => String): Unit = self.logWarning(msg) 
 
 
-  
 
 
-           override protected def logError(msg: => String): Unit = self.logError(msg) 
 
 
-         } 
 
 
-       } 
 
 
-  
 
 
-       override protected def logInfo(msg: => String): Unit = printMessage(msg) 
 
 
-  
 
 
-       override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") 
 
 
-  
 
 
-       override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg") 
 
 
-  
 
 
-       override def doSubmit(args: Array[String]): Unit = { 
 
 
-         try { 
 
 
-           super.doSubmit(args) 
 
 
-         } catch { 
 
 
-           case e: SparkUserAppException => 
 
 
-             exitFn(e.exitCode) 
 
 
-         } 
 
 
-       } 
 
 
-  
 
 
-     } 
 
 
-  
 
 
-     submit.doSubmit(args) 
 
 
-   } 
 
 
这里我们进入SparkSubmit的doSubmit方法:
这里执行两步:
- parseArguments(args)构造SparkSubmitArguments类对象
- 执行submit()方法
-  def doSubmit(args: Array[String]): Unit = { 
 
 
-      
 
 
-      
 
 
-     val uninitLog = initializeLogIfNecessary(true, silent = true) 
 
 
-      
 
 
-     val appArgs = parseArguments(args) 
 
 
-     if (appArgs.verbose) { 
 
 
-       logInfo(appArgs.toString) 
 
 
-     } 
 
 
-     appArgs.action match { 
 
 
-       case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) 
 
 
-       case SparkSubmitAction.KILL => kill(appArgs) 
 
 
-       case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) 
 
 
-       case SparkSubmitAction.PRINT_VERSION => printVersion() 
 
 
-     } 
 
 
-   } 
 
 
接下来我们进入submit()
- 首先判断集群是否为standalone模式,这里由于集群是k8s native模式,直接执行else,进入doRunMain()
- 由于我们spark-submit没有--proxy-user,直接执行53行的else,进入runMain()
-     
 
 
-  
 
 
-  
 
 
-  
 
 
-   @tailrec 
 
 
-   private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { 
 
 
-  
 
 
-     def doRunMain(): Unit = { 
 
 
-       if (args.proxyUser != null) { 
 
 
-         val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, 
 
 
-           UserGroupInformation.getCurrentUser()) 
 
 
-         try { 
 
 
-           proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { 
 
 
-             override def run(): Unit = { 
 
 
-               runMain(args, uninitLog) 
 
 
-             } 
 
 
-           }) 
 
 
-         } catch { 
 
 
-           case e: Exception => 
 
 
-              
 
 
-              
 
 
-              
 
 
-             if (e.getStackTrace().length == 0) { 
 
 
-               error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") 
 
 
-             } else { 
 
 
-               throw e 
 
 
-             } 
 
 
-         } 
 
 
-       } else { 
 
 
-         runMain(args, uninitLog) 
 
 
-       } 
 
 
-     } 
 
 
-  
 
 
-      
 
 
-      
 
 
-      
 
 
-      
 
 
-      
 
 
-     if (args.isStandaloneCluster && args.useRest) { 
 
 
-       try { 
 
 
-         logInfo("Running Spark using the REST application submission protocol.") 
 
 
-         doRunMain() 
 
 
-       } catch { 
 
 
-          
 
 
-         case e: SubmitRestConnectionException => 
 
 
-           logWarning(s"Master endpoint ${args.master} was not a REST server. " + 
 
 
-             "Falling back to legacy submission gateway instead.") 
 
 
-           args.useRest = false 
 
 
-           submit(args, false) 
 
 
-       } 
 
 
-      
 
 
-     } else { 
 
 
-       doRunMain() 
 
 
-     } 
 
 
-   } 
 
 
-  
 
 
进入runMain有两个关键步骤:
- 
- 初始化childArgs,childClasspath,sparkConf,childMainClass.
 
- 
- 实例化childMainClass
 
以上所谓的child都指的是resource manager中不用模式下提交作业的Client Main Class.
-   
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-  
 
 
-   private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { 
 
 
-     val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) 
 
 
-      
 
 
-     if (uninitLog) { 
 
 
-       Logging.uninitialize() 
 
 
-     } 
 
 
-  
 
 
-     if (args.verbose) { 
 
 
-       logInfo(s"Main class:\n$childMainClass") 
 
 
-       logInfo(s"Arguments:\n${childArgs.mkString("\n")}") 
 
 
-        
 
 
-       logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}") 
 
 
-       logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") 
 
 
-       logInfo("\n") 
 
 
-     } 
 
 
-     val loader = getSubmitClassLoader(sparkConf) 
 
 
-     for (jar <- childClasspath) { 
 
 
-       addJarToClasspath(jar, loader) 
 
 
-     } 
 
 
-  
 
 
-     var mainClass: Class[_] = null 
 
 
-  
 
 
-     try { 
 
 
-       mainClass = Utils.classForName(childMainClass) 
 
 
-     } catch { 
 
 
-       case e: ClassNotFoundException => 
 
 
-         logError(s"Failed to load class $childMainClass.") 
 
 
-         if (childMainClass.contains("thriftserver")) { 
 
 
-           logInfo(s"Failed to load main class $childMainClass.") 
 
 
-           logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") 
 
 
-         } 
 
 
-         throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) 
 
 
-       case e: NoClassDefFoundError => 
 
 
-         logError(s"Failed to load $childMainClass: ${e.getMessage()}") 
 
 
-         if (e.getMessage.contains("org/apache/hadoop/hive")) { 
 
 
-           logInfo(s"Failed to load hive class.") 
 
 
-           logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") 
 
 
-         } 
 
 
-         throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) 
 
 
-     } 
 
 
-  
 
 
-     val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { 
 
 
-       mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] 
 
 
-     } else { 
 
 
-       new JavaMainApplication(mainClass) 
 
 
-     } 
 
 
-  
 
 
下面开始详细说明runMain()的两步:
第一步,初始化spark应用配置
- val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) 
 
 
我们可以看一下prepareSubmitEnvironment()方法中有以下关键部分:
- 确认集群模式
-   
 
 
-     val clusterManager: Int = args.master match { 
 
 
-       case "yarn" => YARN 
 
 
-       case m if m.startsWith("spark") => STANDALONE 
 
 
-       case m if m.startsWith("mesos") => MESOS 
 
 
-       case m if m.startsWith("k8s") => KUBERNETES 
 
 
-       case m if m.startsWith("local") => LOCAL 
 
 
-       case _ => 
 
 
-         error("Master must either be yarn or start with spark, mesos, k8s, or local") 
 
 
-         -1 
 
 
-     } 
 
 
-  
 
 
-      
 
 
-     var deployMode: Int = args.deployMode match { 
 
 
-       case "client" | null => CLIENT 
 
 
-       case "cluster" => CLUSTER 
 
 
-       case _ => 
 
 
-         error("Deploy mode must be either client or cluster") 
 
 
-         -1 
 
 
-     } 
 
 
确认完spark-submit提交的参数中是kubernetes的cluster模式之后
2. 封装spark应用的classpath,files,sparkConf,以及childmainClass.
prepareSubmitEnvironment()方法中关于k8s的几个代码块:
初始化k8s模式的spark master
-  
 
 
-     if (clusterManager == KUBERNETES) { 
 
 
-       args.master = Utils.checkAndGetK8sMasterUrl(args.master) 
 
 
-        
 
 
-       if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { 
 
 
-         error( 
 
 
-           "Could not load KUBERNETES classes. " + 
 
 
-             "This copy of Spark may not have been compiled with KUBERNETES support.") 
 
 
-       } 
 
 
-     } 
 
 
构造各种集群模式判断条件的flag变量
-     val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER 
 
 
-     val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER 
 
 
-     val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER 
 
 
-     val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER 
 
 
-     val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT 
 
 
-     val isKubernetesClusterModeDriver = isKubernetesClient && 
 
 
-       sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) 
 
 
-  
 
 
当然,我们的集群模式是kubernetes的cluster模式,根据isKubernetesCluster和isKubernetesClusterModeDriver,进入特定jar依赖解决和下载远程文件的流程,如果是Yarn或者Messos则进入的是其他流程:
将依赖的Jar 加入classpath
- if (!isMesosCluster && !isStandAloneCluster) { 
 
 
-        
 
 
-        
 
 
-       val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( 
 
 
-         args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, 
 
 
-         args.ivySettingsPath) 
 
 
-  
 
 
-       if (!StringUtils.isBlank(resolvedMavenCoordinates)) { 
 
 
-          
 
 
-          
 
 
-          
 
 
-          
 
 
-          
 
 
-         if (isKubernetesClusterModeDriver) { 
 
 
-           val loader = getSubmitClassLoader(sparkConf) 
 
 
-           for (jar <- resolvedMavenCoordinates.split(",")) { 
 
 
-             addJarToClasspath(jar, loader) 
 
 
-           } 
 
 
-         } else if (isKubernetesCluster) { 
 
 
-            
 
 
-            
 
 
-           childClasspath ++= resolvedMavenCoordinates.split(",") 
 
 
-         } else { 
 
 
-           args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) 
 
 
-           if (args.isPython || isInternal(args.primaryResource)) { 
 
 
-             args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) 
 
 
-           } 
 
 
-         } 
 
 
-       } 
 
 
-  
 
 
-  
 
 
下载依赖的远程文件
-      
 
 
-     var localPrimaryResource: String = null 
 
 
-     var localJars: String = null 
 
 
-     var localPyFiles: String = null 
 
 
-     if (deployMode == CLIENT) { 
 
 
-       localPrimaryResource = Option(args.primaryResource).map { 
 
 
-         downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) 
 
 
-       }.orNull 
 
 
-       localJars = Option(args.jars).map { 
 
 
-         downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 
 
 
-       }.orNull 
 
 
-       localPyFiles = Option(args.pyFiles).map { 
 
 
-         downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 
 
 
-       }.orNull 
 
 
-  
 
 
-       if (isKubernetesClusterModeDriver) { 
 
 
-          
 
 
-          
 
 
-          
 
 
-         args.jars = renameResourcesToLocalFS(args.jars, localJars) 
 
 
-         val localFiles = Option(args.files).map { 
 
 
-           downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 
 
 
-         }.orNull 
 
 
-         args.files = renameResourcesToLocalFS(args.files, localFiles) 
 
 
-       } 
 
 
-     } 
 
 
-  
 
 
- > 初始化sparkConf 
 
 
-  
 
 
- ``` scala?linenums 
 
 
-   
 
 
-      
 
 
-     val options = List[OptionAssigner]( 
 
 
-  
 
 
-        
 
 
-       OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"), 
 
 
-       OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = SUBMIT_DEPLOY_MODE.key), 
 
 
-       OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), 
 
 
-       OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), 
 
 
-       OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, 
 
 
-         confKey = DRIVER_MEMORY.key), 
 
 
-       OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = DRIVER_CLASS_PATH.key), 
 
 
-       OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = DRIVER_JAVA_OPTIONS.key), 
 
 
-       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = DRIVER_LIBRARY_PATH.key), 
 
 
-       OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = PRINCIPAL.key), 
 
 
-       OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 
 
 
-         confKey = KEYTAB.key), 
 
 
-       OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), 
 
 
-  
 
 
-        
 
 
-       OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, 
 
 
-         CLUSTER, confKey = "spark.jars.packages"), 
 
 
-       OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, 
 
 
-         CLUSTER, confKey = "spark.jars.repositories"), 
 
 
-       OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, 
 
 
-         CLUSTER, confKey = "spark.jars.ivy"), 
 
 
-       OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, 
 
 
-         CLUSTER, confKey = "spark.jars.excludes"), 
 
 
-  
 
 
-        
 
 
-       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"), 
 
 
-       OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles", 
 
 
-         mergeFn = Some(mergeFileLists(_, _))), 
 
 
-       OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars", 
 
 
-         mergeFn = Some(mergeFileLists(_, _))), 
 
 
-       OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files", 
 
 
-         mergeFn = Some(mergeFileLists(_, _))), 
 
 
-       OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives", 
 
 
-         mergeFn = Some(mergeFileLists(_, _))), 
 
 
-  
 
 
-        
 
 
-       OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = EXECUTOR_INSTANCES.key), 
 
 
-       OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = EXECUTOR_CORES.key), 
 
 
-       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = EXECUTOR_MEMORY.key), 
 
 
-       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = CORES_MAX.key), 
 
 
-       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = FILES.key), 
 
 
-       OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), 
 
 
-       OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 
 
 
-         confKey = JARS.key), 
 
 
-       OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, 
 
 
-         confKey = DRIVER_MEMORY.key), 
 
 
-       OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, 
 
 
-         confKey = DRIVER_CORES.key), 
 
 
-       OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, 
 
 
-         confKey = DRIVER_SUPERVISE.key), 
 
 
-       OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), 
 
 
-  
 
 
-        
 
 
-        
 
 
-        
 
 
-       OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")	   
 
 
-     ) 
 
 
-      
 
 
-      
 
 
-      
 
 
-       
 
 
-     for (opt <- options) { 
 
 
-       if (opt.value != null && 
 
 
-           (deployMode & opt.deployMode) != 0 && 
 
 
-           (clusterManager & opt.clusterManager) != 0) { 
 
 
-         if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } 
 
 
-         if (opt.confKey != null) { 
 
 
-           if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) { 
 
 
-             sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value)) 
 
 
-           } else { 
 
 
-             sparkConf.set(opt.confKey, opt.value) 
 
 
-           } 
 
 
-         } 
 
 
-       } 
 
 
-     } 
 
 
初始化childMainClass,并传递Spark Application的mainClass或者R,Python的执行文件到childArgs
如果是Client模式,childMainClass直接就是Spark Application Main Class:
-  
 
 
-      
 
 
-     if (deployMode == CLIENT) { 
 
 
-       childMainClass = args.mainClass 
 
 
-       if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { 
 
 
-         childClasspath += localPrimaryResource 
 
 
-       } 
 
 
-       if (localJars != null) { childClasspath ++= localJars.split(",") } 
 
 
-     } 
 
 
如果是Cluster模式,childMainClass就是Kubernetes的Client Main Class,由它去调用Spark Application Main Class.
-     if (isKubernetesCluster) { 
 
 
-        
 
 
-        
 
 
-       childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS 
 
 
-       if (args.primaryResource != SparkLauncher.NO_RESOURCE) { 
 
 
-         if (args.isPython) { 
 
 
-           childArgs ++= Array("--primary-py-file", args.primaryResource) 
 
 
-           childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") 
 
 
-         } else if (args.isR) { 
 
 
-           childArgs ++= Array("--primary-r-file", args.primaryResource) 
 
 
-           childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") 
 
 
-         } 
 
 
-         else { 
 
 
-           childArgs ++= Array("--primary-java-resource", args.primaryResource) 
 
 
-           childArgs ++= Array("--main-class", args.mainClass) 
 
 
-         } 
 
 
-       } else { 
 
 
-         childArgs ++= Array("--main-class", args.mainClass) 
 
 
-       } 
 
 
-       if (args.childArgs != null) { 
 
 
-         args.childArgs.foreach { arg => 
 
 
-           childArgs += ("--arg", arg) 
 
 
-         } 
 
 
-       } 
 
 
-     } 
 
 
-  
 
 
最后完成childArgs, childClasspath, sparkConf, childMainClass的初始化并返回
第二步,执行spark应用
进入runMain完成第一步之后,执行childClass中的main方法,这里cluster模式的childClass就是Yarn,Kubernetes,Mesos提交作业的Client类
通过Client再一次调用我们编写的Spark mainClass,这里使用例子SparkPi:
- package org.apache.spark.examples 
 
 
-  
 
 
- import scala.math.random 
 
 
-  
 
 
- import org.apache.spark.sql.SparkSession 
 
 
-  
 
 
-  
 
 
- object SparkPi { 
 
 
-   def main(args: Array[String]): Unit = { 
 
 
-     val spark = SparkSession 
 
 
-       .builder 
 
 
-       .appName("Spark Pi") 
 
 
-       .getOrCreate() 
 
 
-     val slices = if (args.length > 0) args(0).toInt else 2 
 
 
-     val n = math.min(100000L * slices, Int.MaxValue).toInt  
 
 
-     val count = spark.sparkContext.parallelize(1 until n, slices).map { i => 
 
 
-       val x = random * 2 - 1 
 
 
-       val y = random * 2 - 1 
 
 
-       if (x*x + y*y <= 1) 1 else 0 
 
 
-     }.reduce(_ + _) 
 
 
-     println(s"Pi is roughly ${4.0 * count / (n - 1)}") 
 
 
-     spark.stop() 
 
 
-   } 
 
 
- } 
 
 
所以从spark-class执行java命令之后,调用关系链为SparkSubmit.main->KubernetesClientApplication.main->SparkPi.main
在知道调用关系链之后,我们再看runMain的最后片段,可以看到第20行调用了SparkApplication接口的start(childArgs,sparkConf)方法.
-   val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { 
 
 
-        
 
 
-       mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] 
 
 
-     } else { 
 
 
-        
 
 
-       new JavaMainApplication(mainClass) 
 
 
-     } 
 
 
-  
 
 
-     @tailrec 
 
 
-     def findCause(t: Throwable): Throwable = t match { 
 
 
-       case e: UndeclaredThrowableException => 
 
 
-         if (e.getCause() != null) findCause(e.getCause()) else e 
 
 
-       case e: InvocationTargetException => 
 
 
-         if (e.getCause() != null) findCause(e.getCause()) else e 
 
 
-       case e: Throwable => 
 
 
-         e 
 
 
-     } 
 
 
-  
 
 
-     try { 
 
 
-       app.start(childArgs.toArray, sparkConf) 
 
 
-     } catch { 
 
 
-       case t: Throwable => 
 
 
-         throw findCause(t) 
 
 
-     } 
 
 
现在我们再进入KubernetesClientApplication.start()方法:
-  
 
 
-  
 
 
-  
 
 
- private[spark] class KubernetesClientApplication extends SparkApplication { 
 
 
-  
 
 
-   override def start(args: Array[String], conf: SparkConf): Unit = { 
 
 
-     val parsedArguments = ClientArguments.fromCommandLineArgs(args) 
 
 
-     run(parsedArguments, conf) 
 
 
-   } 
 
 
-  
 
 
-   private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { 
 
 
-     val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") 
 
 
-      
 
 
-      
 
 
-      
 
 
-      
 
 
-     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" 
 
 
-     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) 
 
 
-     val kubernetesConf = KubernetesConf.createDriverConf( 
 
 
-       sparkConf, 
 
 
-       kubernetesAppId, 
 
 
-       clientArguments.mainAppResource, 
 
 
-       clientArguments.mainClass, 
 
 
-       clientArguments.driverArgs) 
 
 
-      
 
 
-      
 
 
-     val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) 
 
 
-     val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None 
 
 
-  
 
 
-     val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) 
 
 
-  
 
 
-     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( 
 
 
-       master, 
 
 
-       Some(kubernetesConf.namespace), 
 
 
-       KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, 
 
 
-       SparkKubernetesClientFactory.ClientType.Submission, 
 
 
-       sparkConf, 
 
 
-       None, 
 
 
-       None)) { kubernetesClient => 
 
 
-         val client = new Client( 
 
 
-           kubernetesConf, 
 
 
-           new KubernetesDriverBuilder(), 
 
 
-           kubernetesClient, 
 
 
-           waitForAppCompletion, 
 
 
-           watcher) 
 
 
-         client.run() 
 
 
-     } 
 
 
-   } 
 
 
- } 
 
 
-  
 
 
这里由Client连接k8s的API Server,开始构建Kubernetes Driver Pod,提交Spark on k8s的作业.
接下来,我们开始分析Driver Pod又是如何构建Exector Pod,并分配作业的.
未完待续...