码迷,mamicode.com
首页 > 其他好文 > 详细

Spark问题笔记2

时间:2015-08-26 18:00:53      阅读:368      评论:0      收藏:0      [点我收藏+]

标签:

1、学习Spark必须要深入理解RDD编程模型。为什么呢?

     RDD是Spark抽象的基石,整个Spark的编程都是基于对RDD的操作完成的。RDD(弹性分布式数据集,Resilient Distributed Datasets),其特性是只读的、可分区、容错的的数据集合;所谓弹性,指内存不够时,可以与磁盘进行交换(Spark是基于内存的),上述是Spark快的一个原因Spark快的另一个原因是其容错机制,基于DAG图,lineage是轻量级且高效的。RDD在代码中本质上相当于一个元数据结构存储数据分区及逻辑结构映射关系,存储着RDD之间的依赖转换关系。

Block Manager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或磁盘。RDD的Partition是逻辑数据块,对应于Block Manager的物理块Block;

               技术分享

  RDD由以下几个主要部分组成:

    1)getPartitions() - 返回一系列的partition集合,一个RDD中有多个data partition

    2)getDependencies() - 返回的RDD依赖关系是一个Seq集合(分为窄依赖和宽依赖)

    3)compute(parition,TaskContext) - 对于给定的parition数据集,需要作哪些计算

    4)getDreferredLocations() -  对于data partition的位置偏好,partition的首选位置

    5)partitioner - 对于计算出来的数据结果如何分发,即可选的分区策略

   注:宽依赖:一个父RDD被多个子RDD引用,如groupByKey操作;多数是shuffle操作;但大多数时候是shuffle操作,因此Spark会将此Stage定义为ShuffleMapStage,以便于向MapOutputTracker注册shuffle操作。Spark通常将shuffle操作定义为stage的边界。

       窄依赖:一个父RDD最多被一个子RDD引用,如map,filter等操作;尽可能的将RDD转换放在同一个Stage中。

2、RDD操作

    RDD的操作分两种,分别为transformation和action。经Transformation处理之后,数据集中的内容会发生更改,由RRD A转换成为RDD B;而经Action处理之后,数据集中的内容会被归约为一个具体的数值。

  1)Transformation操作

  • map(func) :返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
  • filter(func) : 返回一个新的数据集,由经过func函数后返回值为true的原元素组成
  • flatMap(func) : 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
  • sample(withReplacement, frac, seed) :
    根据给定的随机种子seed,随机抽样出数量为frac的数据
  • union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成
  • groupByKey([numTasks]) :
    在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
  • reduceByKey(func, [numTasks]) : 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
  • join(otherDataset, [numTasks]) :
    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
  • groupWith(otherDataset, [numTasks]) : 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup
  • cartesian(otherDataset) : 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。
  • flatMap(func) :
    类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

  2)Action操作

  • reduce(func) : 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
  • collect() : 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
  • count() : 返回数据集的元素个数
  • take(n) : 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
  • first() : 返回数据集的第一个元素(类似于take(1))
  • saveAsTextFile(path) : 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
  • saveAsSequenceFile(path) : 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
  • foreach(func) : 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

                                                 技术分享

3、RDD核心组件

    输入:在Spark程序运行中,加载外部存储数据(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)创建RDD,通过BlockManager进行管理。
    RDD操作:通过transformation转换为新的RDD;通过Action触发Spark提交作业;还可以通过cache缓存数据到内存;
     输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int型数据)

                                   技术分享       
初始化SparkContext时会初始化一系列的内容,例如会查看内存的设置情况等,也会创建和启动scheduler
Block Tracker:Block和Partition对应关系的管理器;
Shuffle Tracker: 用于记录Shuffle操作过程细节;
每个Partition都会由一个Task负责运行,这些Task是并发的运行在Executor中。                       

例子:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

4、Spark作业调度系统

(Stage DAG:Spark提交Job之后,会把Job(每个Action对应一个Job)生成多个Stage,多个Stage之间是有依赖的,而这种依赖之间的关系构成了有向无环图DAG;)

                  技术分享

当Action作用某一RDD时候,这个action会调用sc.runJob方法作为某一Job被提交,在提交的过程中,由图可以知道:RDD Object产生DAG,然后进入DAGScheduler阶段,把DAG拆分成若干Stage(每组Stage由一组Task组成),DAGScheduler是面向Stage的高层次调度器,根据RDD依赖关系划分不同的Stage,并在每一个Stage内封装TaskSet,结合当前的缓存情况和数据就近读取的原则,将TaskSet提交给TaskScheduler;然后TaskScheduler将提交过来的TaskSet提交到集群中执行。

注:每一个Job被分为多个stage,划分stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个stage,避免多个stage之间的消息传递开销。








版权声明:本文为博主原创文章,未经博主允许不得转载。

Spark问题笔记2

标签:

原文地址:http://blog.csdn.net/feige1990/article/details/48002893

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!