RDD,即弹性分布式数据集,也就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。
RDD支持两种类型的操作:转化操作和行动操作。转换操作会由一个RDD生成一个新的RDD;行动操作会对RDD计算出一个结果。
Spark程序一般都按照如下方式工作。
(1) 从外部数据创建出输入RDD。
(2) 使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD。
(3) 告诉Spark对需要被重用的中间结果RDD执行persist()操作,将RDD持久化到内存中
(4) 使用行动操作(例如count()和first()等等)来触发一次并行计算,Spark会对计算进行优化后再执行。
那么,我们先尝试一下吧。打开终端,由于配置好环境变量了,我们可以在任何地方执行spark-shell命令。默认进入Scala Shell中。
但是,如果对于Scala不熟悉,我们可以使用Python Shell,我们可以键入”pyspark”,效果如下:
为了避免分心,我只使用Scala Shell来进行练习。
(1) 创建RDD。Spark提供了两种创建RDD的方式:读取外部数据集,在驱动程序中对一个集合进行并行化。
第一种,从外部读取数据集,路径可以相对路径和绝对路径,在此不用过多纠结。(注:在Shell中,默认sc为创建好的SparkContext,即为Spark的连接。)
第二种,对一个集合进行并行化。
(2) RDD操作:转换操作
val inputRDD = sc.textFile(“app/log.txt”)
val errorsRDD = inputRDD.filter(line => line.contains(“error”))
(3) RDD操作:行动操作
运行结果
我们可以取出前3行结果
(补充:转化操作是惰性求值的,如调用map()时,操作不会立即执行,而是在必要的时候才会执行。但我们可以随时通过一个行动操作来强制使得Spark执行RDD的转化操作,如使用count()。)
(4) 向Spark传递函数
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
(5) 常见的转化操作和行动操作
1.针对各个元素得转化操作,常用的有map()和filter():映射和筛选。
例子:计算RDD中各个值得平方
2.伪集合操作
尽管RDD本身不是严格意义上得集合,但它也支持许多数学上的集合操作,比如合并和相交。如下图,一些简单的集合操作。
3.行动操作
最常见的行动操作reduce(),它接受一个函数作为参数,这个函数将连个RDD元素类型的数据返回一个同样类型的新元素。
例子:求和,求积
val input = sc.parallelize(List(2,3,4,5))
除此之外还有fold和aggregate。fold需要提供初始值,aggregate可以返回与所操作的RDD类型不同的类型。
(6)在Scala中,在不同RDD类型之间的转换时隐式处理的,Java特殊类型之间的转换更加明确,拥有特殊的类来处理专门的RDD,如JavaDoubleRDD和JavaPairRDD。
(7)持久化(缓存)
SparkRDD时惰性求值的,这在迭代算法中消耗格外地大,为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。在Scala和Java中,persist默认会把数据以序列化的形式缓存在JVM的堆空间中。