标签:rdd span key imp set red apach app seq
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object kv2RDD { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("My scala word count").setMaster("local") val sc = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",3),("a",2),("b",3),("c",4),("c",6),("c",8)),3) rdd.saveAsTextFile("output") // part-00000 (a,3)(a,2) // part-00001 (b,3)(c,4) // part-00002 (c,6)(c,8) //1. groupByKey:对pairRDD进行分区操作,将相同的key对应值聚合到一个sequence中 val groupByKeyRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey() groupByKeyRDD.collect().foreach(println) // (c,CompactBuffer(4, 6, 8)) // (a,CompactBuffer(3, 2)) // (b,CompactBuffer(3)) //2. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作 val reduceByKeyRDD: RDD[(String, Int)] = rdd.reduceByKey((x,y)=>x+y) reduceByKeyRDD.collect().foreach(println) // // (c,18) // (a,5) // (b,3) //3. aggregateByKey:分区内和分区间计算逻辑不同 val aggregateByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_),_+_) aggregateByKeyRDD.collect().foreach(println) // (c,12) // (a,3) // (b,3) //4. foldByKey:分区内和分区间计算逻辑相同 val foldByKeyRDD: RDD[(String, Int)] = rdd.foldByKey(0)(_+_) foldByKeyRDD.collect().foreach(println) // (c,18) // (a,5) // (b,3) //5. combineByKey val combineByKeyRDD: RDD[(String, (Int, Int))] = rdd.combineByKey( (_, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) combineByKeyRDD.collect().foreach(println) // (c,(18,3)) // (a,(5,2)) // (b,(3,1)) //6. sortByKey
val rdd2: RDD[(Int, String)] = sc.makeRDD(List((3,"a"),(2,"a"),(4,"c"),(6,"c"),(8,"c")),3)
val sortByKeyRDD: RDD[(Int, String)] = rdd2.sortByKey() sortByKeyRDD.collect().foreach(println)
// (2,a)
// (3,a)
// (4,c)
// (6,c)
// (8,c) } } //
标签:rdd span key imp set red apach app seq
原文地址:https://www.cnblogs.com/hapyygril/p/13720390.html