标签:
RDD是Spark抽象的基石,整个Spark的编程都是基于对RDD的操作完成的。RDD(弹性分布式数据集,Resilient Distributed Datasets),其特性是只读的、可分区、容错的的数据集合;所谓弹性,指内存不够时,可以与磁盘进行交换(Spark是基于内存的),上述是Spark快的一个原因。Spark快的另一个原因是其容错机制,基于DAG图,lineage是轻量级且高效的。RDD在代码中本质上相当于一个元数据结构,存储数据分区及逻辑结构映射关系,存储着RDD之间的依赖转换关系。
Block Manager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或磁盘。RDD的Partition是逻辑数据块,对应于Block Manager的物理块Block;
RDD由以下几个主要部分组成:
1)getPartitions() - 返回一系列的partition集合,一个RDD中有多个data partition
2)getDependencies() - 返回的RDD依赖关系是一个Seq集合(分为窄依赖和宽依赖)
3)compute(parition,TaskContext) - 对于给定的parition数据集,需要作哪些计算
4)getDreferredLocations() - 对于data partition的位置偏好,partition的首选位置
5)partitioner - 对于计算出来的数据结果如何分发,即可选的分区策略
注:宽依赖:一个父RDD被多个子RDD引用,如groupByKey操作;多数是shuffle操作;但大多数时候是shuffle操作,因此Spark会将此Stage定义为ShuffleMapStage,以便于向MapOutputTracker注册shuffle操作。Spark通常将shuffle操作定义为stage的边界。
窄依赖:一个父RDD最多被一个子RDD引用,如map,filter等操作;尽可能的将RDD转换放在同一个Stage中。
RDD的操作分两种,分别为transformation和action。经Transformation处理之后,数据集中的内容会发生更改,由RRD A转换成为RDD B;而经Action处理之后,数据集中的内容会被归约为一个具体的数值。
1)Transformation操作
2)Action操作
输入:在Spark程序运行中,加载外部存储数据(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)创建RDD,通过BlockManager进行管理。
RDD操作:通过transformation转换为新的RDD;通过Action触发Spark提交作业;还可以通过cache缓存数据到内存;
输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala
int型数据);
例子:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
当Action作用某一RDD时候,这个action会调用sc.runJob方法作为某一Job被提交,在提交的过程中,由图可以知道:RDD Object产生DAG,然后进入DAGScheduler阶段,把DAG拆分成若干Stage(每组Stage由一组Task组成),DAGScheduler是面向Stage的高层次调度器,根据RDD依赖关系划分不同的Stage,并在每一个Stage内封装TaskSet,结合当前的缓存情况和数据就近读取的原则,将TaskSet提交给TaskScheduler;然后TaskScheduler将提交过来的TaskSet提交到集群中执行。
注:每一个Job被分为多个stage,划分stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个stage,避免多个stage之间的消息传递开销。
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/feige1990/article/details/48002893