标签:
import org.apache.spark.{SparkContext, SparkConf} val conf=new SparkConf().setAppName("RDD1").setMaster("local") val sc=new SparkContext(conf)
内存:Parallelize 或者 makeRDD
外部文件:textFile
//1. both Parallelize and makeRDD could create RDD from In-Memory val distData=sc.parallelize(data) // parallelize val distData1=sc.makeRDD(data) // makeRDD //2 textFile could create RDD from files val distFile=sc.textFile("E:/Java_WS/ScalaDemo/data/wc.txt")
下面两者等价:
myRDD. map (s=> (s,1)) myRDD. map (_,1)
reduceByKey 和sortByKey、groupByKey
distFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println) distFile.flatMap(_.split(" ")).map(s=>(s,1)).sortByKey().collect().foreach(println) distFile.flatMap(_.split(" ")).map(s=>(s,1)).groupByKey().foreach(println)
1)返回key 以及 每个key的个数 (key, cnt)
2)返回 (key,value) 排序后的
3)返回(key, (value1,value2...))
persist() 或 cache()
unpersist() 可以删除缓存RDD
//SparkContext.broadcast(v) is a broadcast variable, could replace v in any place of the cluster val broadcastVar=sc.broadcast(Array(1,2,3)) println(broadcastVar.value(0),broadcastVar.value(1),broadcastVar.value(2)) val accum=sc.accumulator(0,"My Accumulator") sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x) println(accum.value)
标签:
原文地址:http://www.cnblogs.com/skyEva/p/5867472.html