码迷,mamicode.com
首页 > 其他好文 > 详细

Spark 算子实现

时间:2016-04-21 16:40:02      阅读:156      评论:0      收藏:0      [点我收藏+]

标签:

1.map,flatmap,filter用的是scala的内部实现。

2.cogroup,intersection,join,leftOuterJoin,rightOuterJoin,fullOuterJoin

rdd1:[(1,2,3),(2,3,4)]

rdd2:[(1,3,5),(2,4,6)]

rdd1.cogroup(rdd2)

对rdd1调用cogroup: rdd1->cogroup(rdd2)->CoGroupRDD(rdd1,rdd2)->mapValues()->MapPartitionsRDD

cogroup首先会用rdd1和rdd2来new一个CoGroupRDD,然后在对这个CoGroupRDD调用mapValues生成MapPartitionsRDD。

 

2.1intersection的实现

   map()->MapPartitionsRDD->cogroup()->CoGroupRDD->mapvalues()->MapPartitionsRDD->filter()->MapPartitionsRDD->keys()

先用map操作,将key转换为<key,null>,因为cogroup只对key-value做处理。然后在对<key,null>进行cogroup算子操作,最后筛选出非空的元素。

2.2join的实现

5.combineByKey

reduceByKey

groupByKey

 

每一个RDD由如下几个部分组成:

1.Partition[]

2.compute(Parition,context)

3.Dependency[]

4.prefreed locations

5.Partitioner.

 

Dependency

RDD之间的依赖:一个RDD可以依赖于一个或者多个RDD。

Partition之间的依赖:一个RDD可能依赖一个或者多个Partition

每一个RDD都有一个Dependency[],Dependency有如下几种:OneToOneDependency,RangeDependency,NarrowDependency,ShuffleDependency。

 

map操作生成的MapPartitionsRDD里的Partition,依赖于父RDD中的一个Partition。1:1关系。

reduceByKey生成的ShuffleRDD里的Partition,依赖于父RDD中的所有Partition。N:1

 

 

RDD依赖关系的建立

rdd1->rdd2->rdd3.

RDD的物理执行

task.run():rdd.iterator(parttion,context),如果使用了本地化级别,那么久去缓存里面找。如果使用了checkpoint,就使用它。否则的就调用rdd.compute(partition)来计算了。compute是一个抽象方法,具体实现在派生的RDD中。

我们来看一下这两个方法:

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ‘‘not‘‘ be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

 

  def compute(split: Partition, context: TaskContext): Iterator[T]

 

iterator(partition)对partition中的record进行计算,最后返回一个Iterator迭代器,也就是说呢,他将结果抽象成一个scala容器了。compute也是如此。

现在的问题是,compute如何对partition中的数据进行计算,并生成Iterator呢。我们以map操作为例:firstParent.iterator(partition,context)拿到父RDD

的计算结果(Iterator),然后再对这个Iterator调用map来生成结果。

比如rdd1.map(f)这个操作,map里面首先生成MapPartitionsRDD。然后当执行MapPartitionsRDD的compute(partition)时,就会执行如下操作:

1.firstParent.iterator(partition)拿到父RDD也就是rdd1的结算结果,这个结果是一个Iterator

2.f(Iterator),这个f就是用户向map传递的那个闭包函数。

 

Spark 算子实现

标签:

原文地址:http://www.cnblogs.com/francisYoung/p/5417443.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!