标签:waiting 构建 wordcount 集群 接收 park launcher 执行 org
spark任务运行的源码分析
在整个spark任务的编写、提交、执行分三个部分:
① 编写程序和提交任务到集群中
②sparkContext的初始化
③触发action算子中的runJob方法,执行任务
(1)编程程序并提交到集群:
①编程spark程序的代码
②打成jar包到集群中运行
③使用spark-submit命令提交任务
在提交任务时,需要指定 --class 程序的入口(有main方法的类),
1) spark-submit --class xxx
2) ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit $@
3) org.apache.spark.launcher.Main
submit(appArgs, uninitLog)
doRunMain()
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
childMainClass:…./.WordCount (自己编写的代码的主类)
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if() {} else {new JavaMainApplication(mainClass)}
app.start(childArgs.toArray, sparkConf) // 通过反射调用mainClass执行
// 到此为止,相当于调用了我们的自己编写的任务类的main方法执行了。!!!
val mainMethod = klass.getMethod("main", new ArrayString.getClass)
mainMethod.invoke(null, args)
④开始执行自己编写的代码
(2)初始化sparkContext:
当自己编写的程序运行到:new SparkContext()时,就开始了精妙而细致的sparkContext的初始化。
sparkContext的相关介绍:sparkContext是用户通往spark集群的唯一入口,可以用来在spark集群中创建RDD、累加器和广播变量。sparkContext也是整个spark应用程序的一个至关重要的对象,是整个应用程序运行调度的核心(不是资源调度的核心)。在初始化sparkContext时,同时的会初始化DAGScheduler、TaskScheduler和SchedulerBackend,这些至关重要的对象。
sparkContext的构建过程:
1)Driver端执行的代码:
初始化 TaskScheduler
初始化 SchedulerBackend
初始化 DAGScheduler
2)worker和master端执行的代码:
driver向master注册申请资源。
Worker负责启动executor。
(3)触发action算子中的runJob方法:
spark任务运行总结:
- 将编写的程序打成jar包
- 调用spark-submit提交任务到集群上运行
- 运行sparkSubmit 的main方法,在这个方法中通过反射的方式创建我们编写的主类的实例对象,然后调用该对象的main方法,开始执行我们编写的代码
- 当代码运行到new SparkContext对象的的时候,就开始了复杂和精致的sparkContext对象的初始化
- 在初始化SparkContext对象的时候,会创建两个特别重要的对象,分别是:DAGScheduler 和 TaskScheduler,其中【DAGScheduler 的作用】将RDD的依赖切成一个一个的stage,然后stage作为taskSet提交给Taskscheduler。
- 在构建TaskScheduler的同时,会创建两个非常重要的对象,分别是 DriverActor 和 ClientActor,DriverActor负责接收executor的反向注册,将任务提交给executor运行,clientActor是负责向master注册并提交任务
- 当clientActor启动时,会将用户提交的任务相关的参数分装到applicationDescription对象中去,然后提交给master进行任务注册
- 当master接收到clientActor提交的任务请求时,会将请求的参数进行分析,并封装成application,然后将其持久化,然后将其加入到任务队列waitingApps中。
- 当轮到我们提交任务的时候,就开始执行schedule(),进行任务资源的调度
- worker接收到master发送来的launchExecutor 时,会将其解压并封装到ExecutorRunner中,然后调用这个对象的start方法,启动executor
- executor启动后会向driver反向注册
- driver会发送注册成功信息,给executor
- executor接收到driver actor注册成功信息后,就会创建一个线程池,用于执行driveractor发送过来的任务
- 当属于这个任务的所有的 Executor 启动并反向注册成功后,就意味着运行这个任务的 环境已经准备好了,driver 会结束 SparkContext 对象的初始化,也就意味着 new SparkContext 这句代码运行完成
- 当sparkContext初始化完成之后,就会继续运行我们的代码,直到运行到action算子时,也就意味着触发了一个job的提交
- driver 会将这个 job 提交给 DAGScheduler
- DAGScheduler将接收到的job,从最后一个算子开始推导,将DAG根据依赖关系划分成为一个个stage,然后将stage封装成一个taskSet,并将taskSet中的task提交给taskScheduler
- taskScheduler接收到DAGScheduler发送过来的task,会拿到一个序列化器,对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的executor中运行
- executor接收到了DriverActor 发送过来的launchTask 时,会拿到一个反序列化器,对launchTask 进行反序列化,封装到一个TaskRunner 中,然后从executor这个线程池中获取一个线程,将反序列化好的任务中的算子作用在RDD对应的分区上。
- 最终当所有的task任务完成之后,整个application执行完成,关闭sparkContext对象。
spark任务运行过程的源码分析
标签:waiting 构建 wordcount 集群 接收 park launcher 执行 org
原文地址:http://blog.51cto.com/14048416/2338482