-- 筛选 val rdd = sc.parallelize(List("ABC","BCD","DEF")) val filtered = rdd.filter(_.contains("C")) filtered.collect() Result: Array[String] = Array(ABC, BCD) -- 相乘 val rdd=sc.parallelize(List(1,2,3,4,5)) val times2 = rdd.map(_*2) times2.collect() Result: Array[Int] = Array(2, 4, 6, 8, 10) -- 分割 val rdd=sc.parallelize(List("Spark is awesome","It is fun")) val fm=rdd.flatMap(str=>str.split(" ")) fm.collect() Result: Array[String] = Array(Spark, is, awesome, It, is, fun) -- 频数 val word1=fm.map(word=>(word,1)) val wrdCnt=word1.reduceByKey(_+_) wrdCnt.collect() Result: Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1)) -- 交换 val cntWrd = wrdCnt.map{case (word, count) => (count, word)} cntWrd.groupByKey().collect() Result: Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is))) -- 排重 fm.distinct().collect() Result: Array[String] = Array(is, It, awesome, Spark, fun) -- 并集 val rdd1=sc.parallelize(List(‘A‘,‘B‘)) val rdd2=sc.parallelize(List(‘B‘,‘C‘)) rdd1.union(rdd2).collect() -- 交集 rdd1.intersection(rdd2).collect() -- 笛卡尔积 rdd1.cartesian(rdd2).collect() -- 相减 rdd1.subtract(rdd2).collect() -- 连接 val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot"))) val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista"))) personFruit.join(personSE).collect() Result: Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista))) -- 计数 val rdd = sc.parallelize(list(‘A‘,‘B‘,‘c‘)) rdd.count() Result: long = 3 -- 展示数组 val rdd = sc.parallelize(list(‘A‘,‘B‘,‘c‘)) rdd.collect() Result: Array[char] = Array(A, B, c) -- 求和 val rdd = sc.parallelize(list(1,2,3,4)) rdd.reduce(_+_) Result: Int = 10 -- 截取 val rdd = sc.parallelize(list(1,2,3,4)) rdd.take(2) Result: Array[Int] = Array(1, 2) -- 分别格式化 val rdd = sc.parallelize(list(1,2,3,4)) rdd.foreach(x=>println("%s*10=%s".format(x,x*10))) Result: 1*10=10 4*10=40 3*10=30 2*10=20 val rdd = sc.parallelize(list(1,2,3,4)) -- 首项 rdd.first() Result: Int = 1 -- 另存为 val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt")
-- 针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)}) -- subtractByKey 删掉RDD 中键与other RDD 中的键相同的元素 rdd.subtractByKey(other) {(1, 2)} -- join 对两个RDD 进行内连接 rdd.join(other) {(3, (4, 9)), (3,(6, 9))} -- rightOuterJoin 对两个RDD 进行连接操作,确保第一个RDD 的键必须存在(右外连接) rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))} -- leftOuterJoin 对两个RDD 进行连接操作,确保第二个RDD 的键必须存在(左外连接) rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))} -- cogroup 将两个RDD 中拥有相同键的数据分组到一起 rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}
-- 返回RDD 中的所有元素
rdd.collect() {1, 2, 3, 3}
-- RDD 中的元素个数
rdd.count() 4
-- 各元素在RDD 中出现的次数
rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
-- 从RDD 中返回num 个元素
rdd.take(2) {1, 2} top(num)
-- 从RDD 中返回最前面的num个元素
rdd.top(2) {3, 3}
-- 从RDD 中按照提供的顺序返回最前面的num 个元素
rdd.takeOrdered(2)(myOrdering) {3, 3}
-- 从RDD 中返回任意一些元素
rdd.takeSample(false, 1)
-- 并行整合RDD 中所有数据(例如sum)
rdd.reduce((x, y) => x + y) 9
-- 和reduce() 一样, 但是需要提供初始值
rdd.fold(0)((x, y) => x + y) 9
-- 和reduce() 相似, 但是通常返回不同类型的函数
rdd.aggregate((0, 0))
((x, y) =>(x._1 + y, x._2 + 1),
(x, y) =>(x._1 + y._1, x._2 + y._2))
(9,4)
-- 对RDD 中的每个元素使用给定的函数
rdd.foreach(func)