声明:本文基于spark的programming guide,并融合自己的相关理解整理而成
val conf = new SparkConf().setAppName("BasicRDDApp").setMaster("local[4]") //spark://host:port val sc = new SparkContext(conf) /** * parallelized collections * 将scala的集合数据,并行化成为能够并行计算的分布式数据集 */ val data = 1 to 1000 toArray val distData = sc.parallelize(data,10) //后面的数字是表示将集合切分成多少个块 ,通常是一个CPU 2-4块,通常spark可以自动帮你切分 val sum = distData.reduce((a, b) => a+b ) //在reduce的时候才开始真正的执行,driver将任务分布到各个机器上,然后每个机器单独执行,将计算的结果返回到driver程序 println("sum " + sum) /** * 读取外部的数据源 * 1.Hadoop支持的数据源 ,例如HDFS,Cassandra,HBase ,Amazon S3 * ##如果文件地址是本地地址的话,那么他应该在集群的每个节点上都能够被访问(即:每个节点上都应该有同样的文件) * ##textFile的第二个参数控制文件被切割的大小默认为64MB ,可以设置更大的但是不能设置更小的 */ val distFile = sc.textFile("file:///usr/local/spark/README.md") //接下来就可以进行相关的操作了 distFile.persist()//持久化 val len = distFile.map(s => 1).reduce((a, b) => a+b) println(len) val words = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey((a,b) => a+b) //w => (v1+v2+v3+...) //map => 1->1 , flatMap => 1 -> 0..n print(words.count()) words foreach println val twords = distFile.flatMap(l => l.split(" ")).map(w => (w,1)).groupByKey() //分组 w => (v1, v2, v3 ...) twords foreach println //.map(w => (w,1)).foreach(w => w._1);
val broadcastVar = sc.broadcast("string test")//broadcast variable is readonly val v = broadcastVar.value println(v) val accum = sc.accumulator(0, "My Accumulator")//value and name sc.parallelize(1 to 1000000).foreach(x => accum+= 1) println(accum.name + ":" + accum.value)
原文地址:http://blog.csdn.net/youmengjiuzhuiba/article/details/41245603