码迷,mamicode.com
首页 > 其他好文 > 详细

RDDs基本操作、RDDs特性、KeyValue对RDDs

时间:2017-07-28 20:51:04      阅读:271      评论:0      收藏:0      [点我收藏+]

标签:不同   yellow   inpu   width   section   groupby   合并   集合   系统   

摘要:RDD是Spark中极为重要的数据抽象,这里总结RDD的概念,基本操作Transformation(转换)与Action,RDDs的特性,KeyValue对RDDs的Transformation(转换)。

1.RDDs是什么

Resilient distributed datasets(弹性分布式数据集) 。RDDs并行的分布在整个集群中,是Spark分发数据和计算的基础抽象类,一个RDD是一个不可改变的分布式集合对象,Spark中,所有的计算都是通过RDDs的创建,转换操作完成的,一个RDD内部由许多partitions(分片)组成。

分片:每个分片包括一部分数据,partitions可在集群不同节点上计算;分片是Spark并行处理的单元,Spark顺序的,并行的处理分片。

2.RDDs的创建

(1)   把一个存在的集合传给SparkContext的parallelize()方法,测试用
  val rdd=sc.parallelize(Array(1,2,3,4),4)
  第一个参数:待并行化处理的集合;第二个参数:分片个数

(2)  加载外部数据集
  val rddText=sc.textFile("hellospark.txt")

3.RDD基本操作之Transformation(转换)

从之前的RDD构建一个新的RDD,像map()和filter()

(1)逐元素Transformation
map():   接收函数,把函数应用到RDD的每一个元素,返回新RDD。

  val lines=sc.parallelize(Array("hello","spark","hello","world","!"))
  val lines2=lines.map(word=>(word,1))
  lines2.foreach(println)
  //结果:
  (hello,1)
  (spark,1)
  (hello,1)
  (world,1)
  (!,1)

filter():   接收函数,返回只包含满足filter()函数的元素的新RDD。 

  val lines=sc.parallelize(Array("hello","spark","hello","world","!"))
  val lines3=lines.filter(word=>word.contains("hello"))
  lines3.foreach(println)
  //结果:   
  hello
  hello

flatMap(): 对每个输入元素,输出多个输出元素。flat压扁的意思,将RDD中元素压扁后返回一个新的RDD。

    val inputs=sc.textFile("/home/lucy/hellospark.txt")
    val lines=inputs.flatMap(line=>line.split(" "))
    lines.foreach(println)
    //结果
    hello
    spark
    hello
    world
    hello
    !
    //文件内容/home/lucy/hellospark.txt 
    hello spark
    hello world
    hello !

(2)集合运算

RDDs支持数学集合的计算,例如并集,交集计算

    val rdd1=sc.parallelize(Array("red","red","blue","black","white"))
    val rdd2=sc.parallelize(Array("red","grey","yellow"))

    //去重:
    val rdd_distinct=rdd1.distinct()
    //去重结果:
    white
    blue
    red
    black

    //并集:
    val rdd_union=rdd1.union(rdd2)
    //并集结果:
    red
    blue
    black
    white
    red
    grey
    yellow

    //交集:
    val rdd_inter=rdd1.intersection(rdd2)
    //交集结果:
    red

    //包含:
    val rdd_sub=rdd1.subtract(rdd2)
    //包含结果:
    blue
    white
    black

4.RDD基本操作之Action

在RDD上计算出来一个结果。把结果返回给driver program或保存在文件系统,count(),save。

函数名              功能                       例子                                       结果
collect()            返回RDD的所有元素           rdd.collect()          {1,2,3,3}
count()                  计数                     rdd.count()                    4
countByValue()          返回一个map表示唯一元素出现的个数      rdd.countByValue()              {(1,1),(2,1),(3,2)}
take(num)                  返回几个元素                   rdd.take(2)                       {1,2}
top(num)                    返回前几个元素                    rdd.top(2)                     {3,3}
takeOrdered                            返回基于提供的排序算法的前几个元素          rdd.takeOrdered(2)(myOrdering) {3,3}
(num)(ordering)
takeSample                             取样例                        rdd.takeSample(false,1)          不确定
(withReplacement,num,[seed])                           
reduce(func)                  合并RDD中元素                    rdd.reduce((x,y)=>x+y)      9
fold(zero)(func)                  与reduce()相似提供zero value            rdd.fold(0)((x,y)=>x+y)               9
foreach(func)                      对RDD的每个元素作用函数,什么也不返回   rdd.foreach(func)             无

5.RDDs的特性

1.血统关系图:
        Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图,Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据
2.延迟计算
       Spark对RDDs的计算是,他们第一次使用action操作的时候。Spark内部记录metadata表明transformations操作已经被响应了。加载数据也是延迟计算,数据只有          在必要的时候,才会被加载进去。
3.RDD.persist():
       默认每次在RDDs上面进行action操作时,Spark都重新计算RDDs。如果想重复利用一个RDD,可以使用RDD.persist()。upersist()方法从缓存中移除。

      技术分享

6.KeyValue对RDDs

创建KeyValue对RDDs:

val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))

KeyValue对RDDs的Transformation(转换):

(1)reduceByKey(func)    把相同key的结合 

  val rdd4=rdd3.reduceByKey((x,y)=>x+y)
  //结果
  (1,2)
  (3,10)

(2)groupByKey      把相同的key的values分组

  val rdd5=rdd3.groupByKey()
  //结果
  (1,CompactBuffer(2))
  (3,CompactBuffer(4, 6)) 

(3)mapValues()    函数作用于pairRDD的每个元素,key不变

  val rdd6=rdd3.mapValues(x=>x+1)
  //结果
  (1,3)
  (3,5)
  (3,7)

(4)keys/values
  rdd3.keys.foreach(println)
  1
  3
  3

  rdd3.values.foreach(println)
  2
  4
  6

(5)sortByKey
  val rdd7=rdd3.sortByKey()
  //结果
  (1,2)
  (3,4)
  (3,6)

(6)combineByKey():     (createCombiner,mergeValue,mergeCombiners,partitioner)

  最常用的基于key的聚合函数,返回的类型可以与输入类型不一样。许多基于key的聚合函数都用到了它,像groupByKey()

  原理:遍历partition中的元素,元素的key,要么之前见过的,要么不是。如果是新元素,使用我们提供的createCombiner()函数,如果是这个partition中已经存在的key,就会使用mergeValue()函数,合计每个partition的结果的时候,使用mergeCombiner()函数

  例子:求平均值

  val score=sc.parallelize(Array(("Tom",80.0),("Tom",90.0),("Tom",85.0),("Ben",85.0),("Ben",92.0),("Ben",90.0)))
  val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
  //结果
  (Ben,(3,267.0))
  (Tom,(3,255.0))
val average
=score2.map{case(name,(num,score))=>(name,score/num)}   //结果   (Ben,89.0)   (Tom,85.0)

 

RDDs基本操作、RDDs特性、KeyValue对RDDs

标签:不同   yellow   inpu   width   section   groupby   合并   集合   系统   

原文地址:http://www.cnblogs.com/wonglu/p/7252440.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!