码迷,mamicode.com
首页 > 其他好文 > 详细

RDD编程

时间:2018-03-28 12:25:42      阅读:155      评论:0      收藏:0      [点我收藏+]

标签:使用   运行   定义   类型   lis   分发   创建   arch   gpo   

       RDD,即弹性分布式数据集,也就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。

       RDD支持两种类型的操作:转化操作和行动操作。转换操作会由一个RDD生成一个新的RDD;行动操作会对RDD计算出一个结果。

       Spark程序一般都按照如下方式工作。

(1)      从外部数据创建出输入RDD。

(2)      使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD。

(3)      告诉Spark对需要被重用的中间结果RDD执行persist()操作,将RDD持久化到内存中

(4)      使用行动操作(例如count()和first()等等)来触发一次并行计算,Spark会对计算进行优化后再执行。

 

那么,我们先尝试一下吧。打开终端,由于配置好环境变量了,我们可以在任何地方执行spark-shell命令。默认进入Scala Shell中。

 

 

       但是,如果对于Scala不熟悉,我们可以使用Python Shell,我们可以键入”pyspark”,效果如下:

 

 

       为了避免分心,我只使用Scala Shell来进行练习。

 

(1)      创建RDD。Spark提供了两种创建RDD的方式:读取外部数据集,在驱动程序中对一个集合进行并行化。

第一种,从外部读取数据集,路径可以相对路径和绝对路径,在此不用过多纠结。(注:在Shell中,默认sc为创建好的SparkContext,即为Spark的连接。)

 

       第二种,对一个集合进行并行化。

 

(2)      RDD操作:转换操作

val inputRDD = sc.textFile(“app/log.txt”)

val errorsRDD = inputRDD.filter(line => line.contains(“error”))

(3)      RDD操作:行动操作

 

运行结果

 

我们可以取出前3行结果

 

(补充:转化操作是惰性求值的,如调用map()时,操作不会立即执行,而是在必要的时候才会执行。但我们可以随时通过一个行动操作来强制使得Spark执行RDD的转化操作,如使用count()。)

(4)      向Spark传递函数

class SearchFunctions(val query: String) {

  def isMatch(s: String): Boolean = {

         s.contains(query)

  }

  def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {

         rdd.map(isMatch)

  }

  def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {

         rdd.map(x => x.split(query))

  }

  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {

         val query_ = this.query

         rdd.map(x => x.split(query_))

  }

}

(5)      常见的转化操作和行动操作

1.针对各个元素得转化操作,常用的有map()和filter():映射和筛选。

例子:计算RDD中各个值得平方

 

 

2.伪集合操作

尽管RDD本身不是严格意义上得集合,但它也支持许多数学上的集合操作,比如合并和相交。如下图,一些简单的集合操作。

 

3.行动操作

最常见的行动操作reduce(),它接受一个函数作为参数,这个函数将连个RDD元素类型的数据返回一个同样类型的新元素。

例子:求和,求积

val input = sc.parallelize(List(2,3,4,5))

 

除此之外还有fold和aggregate。fold需要提供初始值,aggregate可以返回与所操作的RDD类型不同的类型。

 

       (6)在Scala中,在不同RDD类型之间的转换时隐式处理的,Java特殊类型之间的转换更加明确,拥有特殊的类来处理专门的RDD,如JavaDoubleRDD和JavaPairRDD。

       (7)持久化(缓存)

       SparkRDD时惰性求值的,这在迭代算法中消耗格外地大,为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。在Scala和Java中,persist默认会把数据以序列化的形式缓存在JVM的堆空间中。

      

RDD编程

标签:使用   运行   定义   类型   lis   分发   创建   arch   gpo   

原文地址:https://www.cnblogs.com/xhdp5433/p/8662652.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!