标签:kpi one 上启 ssi 生产环境 main方法 chcon eth 源码分析
Spark有3种集群管理器:
生产环境中一般使用yarn cluser
模式
yarn主要有两个作用.
driver
基于此RPC分配计算任务ApplicationMaster
container
如何查看源码:
Spark-Submit
脚本, 启动SparkSubmit
进程SparkSubmit
进程通过反射的方式调用Client的main方法ResourceManager
发送指令启动ApplicationMaster
ResouceManager
选择一个NodeManager
, 并在该NM
上启动ApplictionMaster
ApplictionMaster
是一个yarn任务运行时第一个由RM启动的container,然后负责整个任务的运行,包括container的申请、启动、kill、状态检查等。ApplicationMaster属于应用程序级,其实现不是由Yarn框架提供(历史原因,yarn提供了MapReduce的ApplicationMaster的实现),需要用户自己实现ApplicationMaster进程的具体实现。
ApplictionMaster
进程启动后, 会启动Driver
子线程, 执行用户作业ApplictionMaster
进程向RM
申请资源, 在NM
申请一个container
启动ExecutorBackend
.
ExecutorBackend
用于进程间的通信
AM
发送指令, 在NM
上启动ExecutorBackend
进程NM
启动ExecutorBackend
进程ExecutorBackend
向Driver
注册自己Driver
注册成功后, ExecutorBackend
创建Executor对象Yarn cluster模式在执行启动脚本后会依此运行以下3种java进程
SparkSubmit
ApplicationMaster
: Driver
作为一个线程运行在该进程中.CoarseGrainedExecutorBackend
SparkSubmit
进程Spark-Submit
脚本, 启动SparkSubmit
进程 Spark-Submit
脚本启动SparkSubmit
进程bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.11-2.1.1.jar 100
之后yarn会依次启动以下3个进程:
- SparkSubmit
- ApplicationMaster
- CoarseGrainedExecutorBackend
SparkSubmit
的主类是org.apache.spark.deploy.SparkSubmit
. 查看改主类方法
def main(args: Array[String]): Unit = {
/*
参数
--master yarn
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
./examples/jars/spark-examples_2.11-2.1.1.jar 100
*/
val appArgs = new SparkSubmitArguments(args)
appArgs.action match {
// 如果没有指定 action, 则 action 的默认值是: action = Option(action).getOrElse(SUBMIT)
case SparkSubmitAction.SUBMIT => submit(appArgs) // 接下来调用该方法
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
/**
* 使用提供的参数提交应用程序
* 有 2 步:
* 1. 准备启动环境.
* 根据集群管理器和部署模式为 child main class 设置正确的 classpath,
系统属性,应用参数
* 2. 使用启动环境调用 child main class 的 main 方法
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
// 准备提交环境 childMainClass = "org.apache.spark.deploy.yarn.Client", 获得Client的主类
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) {
// 在其他任何模式, 仅仅运行准备好的主类
} else {
doRunMain()
}
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
// 在 yarn 集群模式下, 使用 yarn.Client 来封装一下 user class
childMainClass = "org.apache.spark.deploy.yarn.Client"
}
def doRunMain(): Unit = {
if (args.proxyUser != null) {
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
/**
**
使用给定启动环境运行 child class 的 main 方法
* 注意: 如果使用了 cluster deploy mode, 主类并不是用户提供
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
var mainClass: Class[_] = null
try {
// 使用反射的方式加载 childMainClass = "org.apache.spark.deploy.yarn.Client"
mainClass = Utils.classForName(childMainClass)
} catch {
}//
反射出来 Client 的 main 方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
try {
// 通过反射的方式调用 main 方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
}
}
主要关注Cilent是如何通过RM创建Application
def main(argStrings: Array[String]) {
// 设置环境变量 SPARK_YARN_MODE 表示运行在 YARN mode
// 注意: 任何带有 SPARK_ 前缀的环境变量都会分发到所有的进程, 也包括远程
进程
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
// 对传递来的参数进一步封装
val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}
def run(): Unit = {
// 提交应用, 返回应用的 id
this.appId = submitApplication()
}
调用org.apache.hadoop.yarn.client.api.YarnClient的两个api方法
- createApplication方法通过RPC与ResourceManager进程通信(rmClient.getNewApplication(request)),让其分配一个新的Application,结果存在GetNewApplicationResponse实体中,其中包括ApplicationId、集群最大可分配资源。createApplication的结果存在YarnClientApplication实体中。
- 客户端获取到YarnClientApplication后需要设置其中的上下文对象中的信息org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,包括aplicationName、资源、队列、优先级、ApplicationMaster启动命令(在ContainerLaunchContext实体中,普通Container启动也使用这个实体),最后调用上面提到的第二个方法submitApplication,将ApplicationSubmissionContext实体传到ResourceManger端(rmClient.submitApplication(request);)。
/**
*
* 向 ResourceManager 提交运行 ApplicationMaster 的应用程序。
*
*/
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
// 初始化 yarn 客户端
yarnClient.init(yarnConf)
// 启动 yarn 客户端
yarnClient.start()
// 从 RM 创建一个应用程序
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse() // 与ResourceManager进程通信, 获得ApplicationId、集群最大可分配资源
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
// Set up the appropriate contexts to launch our AM
// 设置正确的上下文对象来启动 ApplicationMaster
val containerContext = createContainerLaunchContext(newAppResponse)
// 创建应用程序提交任务上下文
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// 提交应用给 ResourceManager 启动 ApplicationMaster
// "org.apache.spark.deploy.yarn.ApplicationMaster"
yarnClient.submitApplication(appContext)
appId
} catch {
}
}
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
val amClass =
if (isClusterMode) { // 如果是 Cluster 模式
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else { // 如果是 Client 模式
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
amContainer
}
至此, SparkSubmit 进程启动完毕.
ApplicationMaster
进程CoarseGrainedExecutorBackend
进程标签:kpi one 上启 ssi 生产环境 main方法 chcon eth 源码分析
原文地址:https://www.cnblogs.com/bitbitbyte/p/12946181.html