码迷,mamicode.com
首页 > 其他好文 > 详细

Spark(1.0) 内核解析

时间:2016-03-17 21:41:13      阅读:266      评论:0      收藏:0      [点我收藏+]

标签:

Spark的内核部分主要从以下几个方面介绍:

  任务调度系统、I/0模块、通信控制模块、容错模块、shuffle模块

一、任务调度系统

1、作业执行流程

接下来注意几个概念:

  Application:用户自定义的Spark程序,用户提交后,Spark为App分配资源,将程序转换并执行。

  Driver Program:运行Application的main()函数并创建SparkContext

  RDD Graph:RDD是Spark的核心结构,可以通过一系列算子进行操作(主要有Transformation和Action操作)。当RDD遇到Action算子时,将之前的所有算子形成一个有向无环图(DAG),再在Spark中转化为Job,提交到集群执行。一个App中可以包含多个Job。

  Job:一个RDD Graph触发的作业,往往由Spark Action算子触发,在SparkContext中通过runJob方法向Spark提交Job。

  Stage:每个Job会根据RDD的宽依赖关系被切分成很多Stage,每个Stage中包含一组相同的Task,叫TaskSet

  Task:一个分区对应一个Task,Task执行RDD中对应Stage中包含的算子,Task封装好之后放入Executor的线程池中执行。

技术分享

作业执行流程:

  1)用户启动客户端,之后客户端运行用户程序,启动Driver进程。在Driver中启动或实例化DAGScheduler等组件。客户端的Driver向Master注册。

  2)Worker向Master注册(要确保master有活节点可用,或通过心跳机制向master汇报worker节点还活着),Master命令Worker启动Exeuctor。Worker通过创建ExecutorRunner线程,在ExecutorRunner线程内部启动ExecutorBackend进程。

  3)ExecutorBackend启动后,向客户端Driver进程内的SchedulerBackend注册。而SchedulerBackend收到后,会向Worker启动LaunchTask进程,Worker开始执行任务。

技术分享

  4)Driver中的SchedulerBackend进程中包含DAGScheduler进程、TaskScheduler进程。SchedulerBackend收到注册后,DAGScheduler会根据RDD DAG切分成相应的Stage,每个Stage中包含的TaskSet通过TaskScheduler分配给Executor。Executor启动线程池并行化执行Task。

 2、Spark的任务调度系统

技术分享

  1)在最左边通过一系列的Transformation算子和Actions算子,形成DAG图,传递给DAGScheduler,

  2)然后DAGScheduler把其切分成一系列的Stage,即形成TaskSet任务集合,每个TaskSet中包含多个任务。然后把TaskSet传递给TaskScheduler

  3)TaskScheduler收到后,然后把具体任务分配给相应的Worker节点进行计算。

3、以wordcount为例,解析触发Job全生命周期过程

sc.textFile(hdfs://....).flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_).saveAsFile(hdfs://...)
化简为:
sc.textFile(hdfs://....).flatMap(_.split(" ")).map(_,1).reduceByKey(_+_).saveAsFile(hdfs://...)
因为默认得出的结果是没有序的,所以想要得到排序的结果:
sc.textFile(hdfs://....).flatMap(_.split(" ")).map(_,1).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsFile(hdfs://...)

 上述代码经过了HadoopRDD-->MappedRDD-->flatMappedRDD-->MappedRDD-->PairRDDFunctions-->ShuffledRDD-->MappartitionsRDD-->MappedRDD-->SortedRDD-->MappedRDD-->HadoopRDD

HadoopRDD-->MappedRDD:  sc.textFile(hdfs://....)
MappedRDD-->flatMappedRDD: flatMap(_.split(" "))
flatMappedRDD-->MappedRDD: flatMap(_.split(" ")).map(_,1)
MappedRDD-->PairRDDFunctions-->ShuffledRD: map(_,1).reduceByKey(_+_)
ShuffledRDD-->MappartitionsRDD-->MappedRDD:reduceByKey(_+_).map(x=>(x._2,x._1))
MappedRDD-->SortedRDD: map(x=>(x._2,x._1)).sortByKey(false)
SortedRDD-->MappedRDD: sortByKey(false).map(x=>(x._2,x._1))
MappedRDD-->HadoopRDD: map(x=>(x._2,x._1)).saveAsFile(hdfs://...)

 

Spark(1.0) 内核解析

标签:

原文地址:http://www.cnblogs.com/liuzhongfeng/p/5289169.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!