标签:apache link isp get nal strong comm wrapper error
sparkR在spark2.0里面,RDD后端代码位于org.apache.spark.rdd中,R语言相关的位于org.apache.spark.api.r中。
从入口开始,./bin/sparkR里面只有四句话,调用的是这个
exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"
spark-submit里面是个一句话的shell脚本
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
好了,入口是org.apache.spark.deploy.SparkSubmit这个类,该类中的main方法中调用具体方法
case SparkSubmitAction.SUBMIT => submit(appArgs) /**? * Submit the application using the provided parameters.? *? * This runs in two steps. First, we prepare the launch environment by setting up? * the appropriate classpath, system properties, and application arguments for? * running the child main class based on the cluster manager and the deploy mode.? * Second, we use this launch environment to invoke the main method of the child? * main class.? */ ?private def submit(args: SparkSubmitArguments): Unit = {
submit方法准备classpath、系统属性、运行参数,然后按照这些调用下面的方法运行
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
该方法主要两步,第一步调用下面方法进行准备
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
第二部会调用
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
进行执行。
在第一步中将sparkR的R相关代码打包成zip文件,然后设置将要运行的主类
如果是SPARKR-SHELL则调用org.apache.spark.api.r.RBackend
如果是纯粹client模式,则调用org.apache.spark.deploy.RRunner,其调用形式如下,例如
Usage: RRunner <main R file> [app arguments] sun.java.command=com.aliyun.odps.cupid.runtime.Main --class org.apache.spark.deploy.RRunner --primary-r-file testOdpsRdd.R --arg testOdpsRdd.R?
RBackend基于netty用来在R和java之间的通讯
Runner里面会调用启动RBackend,然后启动processBuilder去执行R脚本,也就是这句话:
new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
如何让spark worker识别sparkR代码呢?在R语言中变量R_PROFILE_USER ,用来初始化R运行环境,sparkR相关代码被打包提交到计算集群以后,在计算节点上面首先设置这个数值指向到初始化脚本${SPARK_HOME}/sparkr/SparkR/profile/general.R,这个脚本中识别路径,并且把解压后sparkR的代码安装到当前R环境中。下面是其代码
.First <- function() { packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR") .libPaths(c(packageDir, .libPaths())) Sys.setenv(NOAWT=1) }
下面的代码来自于prepareSubmitEnvironment
// In YARN mode for an R app, add the SparkR package archive to archives? // that can be distributed with the job ?if (args.isR && clusterManager == YARN) {? val rPackagePath = RUtils.localSparkRPackagePath? if (rPackagePath.isEmpty) {? printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")? }? val rPackageFile =? RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)? if (!rPackageFile.exists()) {? printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")? }? val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)?? // Assigns a symbol link name "sparkr" to the shipped package.? args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")? }? ?// If we‘re running a R app, set the main class to our specific R runner ?if (args.isR && deployMode == CLIENT) {? if (args.primaryResource == SPARKR_SHELL) {? args.mainClass = "org.apache.spark.api.r.RBackend" ? } else {? // If a R file is provided, add it to the child arguments and list of files to deploy.? // Usage: RRunner <main R file> [app arguments]? args.mainClass = "org.apache.spark.deploy.RRunner"? args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs? args.files = mergeFileLists(args.files, args.primaryResource)? } ?}?? if (isYarnCluster && args.isR) {? // In yarn-cluster mode for a R app, add primary resource to files? // that can be distributed with the job? args.files = mergeFileLists(args.files, args.primaryResource) ?}
对于普通scala/java作业,standalone情况下直接调用下面类
// In legacy standalone cluster mode, use Client as a wrapper around the user
class?childMainClass = "org.apache.spark.deploy.Client"
在client模式下直接提交用户应用主类运行,这里的主类如果是SPARKR_SHELL的话就是org.apache.spark.api.r.RBackend
直接提交文件执行则调用org.apache.spark.deploy.RRunner
?// In client mode, launch the application main class directly ?// In addition, add the main application jar and any added jars (if any) to the classpath ?if (deployMode == CLIENT) { ? childMainClass = args.mainClass? if (isUserJar(args.primaryResource)) {? childClasspath += args.primaryResource? }? if (args.jars != null) { childClasspath ++= args.jars.split(",") }? if (args.childArgs != null) { childArgs ++= args.childArgs }? }
在yarnCluster模式调度情况下,使用org.apache.spark.deploy.yarn.Client
这个类包装用户的类进行提交
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class? if (isYarnCluster) { ? childMainClass = "org.apache.spark.deploy.yarn.Client" ? if (args.isPython) { ? childArgs += ("--primary-py-file", args.primaryResource)? if (args.pyFiles != null) { ? childArgs += ("--py-files", args.pyFiles)? }? childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")? } else if (args.isR) { ? val mainFile = new Path(args.primaryResource).getName? childArgs += ("--primary-r-file", mainFile)? childArgs += ("--class", "org.apache.spark.deploy.RRunner")? } else {? if (args.primaryResource != SPARK_INTERNAL) { ? childArgs += ("--jar", args.primaryResource)? }? childArgs += ("--class", args.mainClass)? }? if (args.childArgs != null) { ? args.childArgs.foreach { arg => childArgs += ("--arg", arg) } ? } ?}
org.apache.spark.deploy.yarn.Client
Py调用spark过程,在python/pyspark/context.py下面存在
class SparkContext(object)
其中的_jvm成员作为py4j的调用存在,其初始化
233 if not SparkContext._gateway: 234 SparkContext._gateway=gateway or launch_gateway() 235 SparkContext._jvm=SparkContext._gateway.jvm
其调用后端方法
207 # Create a temporary directory inside spark.local.dir: 208 local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) 209 self._temp_dir = 210 self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir, "pyspark") 211 .getAbsolutePath()
标签:apache link isp get nal strong comm wrapper error
原文地址:http://www.cnblogs.com/laodageblog/p/sparkR.html