标签:
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * hadoop * spark * tachyon * hadoop * hbase * spark */ /** * Created by Administrator on 2016/4/23. */ object rdd0_wc { def main(args: Array[String]) { //创建环境变量 val conf=new SparkConf().setMaster("local").setAppName("rdd0_wc") //创建环境变量实例 val sc=new SparkContext(conf) //读取文件 val data=sc.textFile("E://sparkdata//wc.txt") data.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).collect().foreach(println) //wrod计数 } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/23. */ object rdd1_aggregate { /** * aggregate方法 * RDD是工作在Spark上,因此,parallelize方法是将内存数据读入Spark系统中,作为一个整体的数据集。 * math.max方法用于比较数据集中数据的大小; * 第二个_+_方法是对传递的第一个比较方法结果进行处理。第一个比较结果是6,与中立空值相加,所以最终结果为6 */ def main(args: Array[String]) { val conf=new SparkConf().setMaster("local").setAppName("rdd1_aggregate") val sc=new SparkContext(conf) val arr=sc.parallelize(Array(1,2,3,4,5,6)) val result=arr.aggregate(0)(math.max(_, _), _ + _) //aggregate用法 println(result) } /** * 参数改变后 * 这里parallelize将数据分成两个节点存储 * math方法分别查找出两个数据集的最大值,分别是3和6. * 这样在调用aggregate方法的第二个计算方法时,将查找的数据值进行相加,获得最大值9 */ // def main(args: Array[String]) { // val conf=new SparkConf().setMaster("local").setAppName("rdd1_aggregate") // val sc=new SparkContext(conf) // val arr=sc.parallelize(Array(1,2,3,4,5,6),2) // val result=arr.aggregate(0)(math.max(_, _), _ + _) //aggregate用法 // println(result) // } /** * aggregate方法用于字符串 */ // def main(args: Array[String]) { // val conf=new SparkConf().setMaster("local").setAppName("rdd1_aggregate") // val sc=new SparkContext(conf) // val arr=sc.parallelize(Array("hadoop","spark","hive","hbase","kafka")) // val result=arr.aggregate("")((value,word)=>value+"、"+word,_+_) //aggregate用法 // println(result) // } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. * cache方法的作用是将数据内容计算并保存在计算节点的内存中,这个方法的使用是针对Spark的Lazy数据处理模式。 * 在Lazy模式中,数据在编译和使用时是不进行计算的,而仅仅保存其存储地址,只有Action方法到来时才正式计算。 * 这样做的好处是可以极大的减少存储空间,从而提高利用率,而有时必须要求数据进行计算,此时就需要使用cache方法。 */ object rdd2_cache { def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd2_cache") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array("hadoop","spark","hive")) //设定数据集 println(arr) //打印结果 val arr1=arr.cache() println("-----------------------------") //分隔符 arr1.foreach(println) //专门用来打印未进行Action操作的数据的专用方法,可以对数据进行提早计算。 } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. * 笛卡尔积操作cartsian方法 */ object rdd3_cartesian { /** * 此方法用于对不同的数组进行笛卡尔积操作,要求是数据集的长度必须相同,结果作为一个新的数据集返回 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd3_cartesian") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5)) //创建第一个数组 val arr1=sc.parallelize(Array(6,7,8,9,10)) //创建第二个数组 val result=arr.cartesian(arr1) //进行笛卡尔积计算 result.foreach(println) //打印结果 } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd4_coalesce { /** * coalesce方法是将已经存储的数据重新分片后再进行存储 * 第一个参数是将数据重新分成的片数,布尔型数指的是将数据分成更小的片时使用。 */ // def main(args: Array[String]) { // val conf=new SparkConf() //创建环境变量 // .setMaster("local") //设置本地化处理 // .setAppName("rdd4_coalesce") //设置名称 // val sc=new SparkContext(conf) //创建环境变量实例 // val arr=sc.parallelize(Array(1,2,3,4,5,6)) // val arr2=arr.coalesce(2,true) //将数据重新分区 // val result=arr.aggregate(0)(math.max(_,_),_+_) //计算数据值 // println(result) // val result2=arr2.aggregate(0)(math.max(_,_),_+_) //计算重新分区数据值 // println(result2) // } /** * RDD中还有一个repartition方法与这个coalesce方法类似,均是将数据重新分区组合 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd4_coalesce") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5,6)) val arr2=arr.repartition(3) //将数据分区 println(arr2.partitions.length) //打印分区结果 } } |
countByValue用法:
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd5_countByValue { /** * countByValue方法是计算数据集中某个数据出现的个数,并将其以map的形式返回 * @param args */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd5_countByValue") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,1,2,2,3,3,4,5,6)) val result=arr.countByValue() //计算个数 result.foreach(print) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd6_countByKey { /** * countByKey方法与countByValue方法有本质的区别。 * countByKey是计算数组中元数据键值对Key出现的个数 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd6_countByKey") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array((1,"hadoop"),(2,"spark"),(3,"tachyon"))) val result=arr.countByKey() //进行计数 result.foreach(print) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd7_distinct { /** * distinct方法作用是去除数据集中重复项 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd7_distinct") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(("hadoop"),("spark"),("hive"),("hadoop"),("hive"))) val result=arr.distinct() result.foreach(println) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd8_filter { /** * filter方法用来对数据集进行过滤 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd8_filter") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5)) val result=arr.filter(_>=3) result.foreach(println) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd9_flatMap { def main(args: Array[String]) { /** * flatMap方法是对RDD中的数据集进行整体操作的一个特殊方法。 */ val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd9_flatMap") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5,6,7)) val result=arr.flatMap(x=>List(x+1)).collect() //进行数据集计算 result.foreach(println) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd10_map { /** * map方法是对RDD中的数据集中的数据进行逐个处理。 * map与flatMap不同之处在于,flatMap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算。 * 而map方法直接对数据集中的数据做单独的处理。 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd10_map") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5,6)) val result=arr.map(x=>List(x+1)).collect() result.foreach(println) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd11_groupBy { /** * groupBy方法是将传入的数据进行分组 * 传入的第一个参数是方法名,第二个参数是分组的标签 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd11_groupBy") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array(1,2,3,4,5,6)) arr.groupBy(myFilter1).foreach(println) //设置第一个分组 } def myFilter1(num:Int):Int={ //自定义第一个分组 num %2 } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd12_keyBy { /** * keyBy方法是为数据集中的每个个体数据增加一个Key,从而可以和原来的数据集形成键值对。 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd12_keyBy") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr=sc.parallelize(Array("hadoop","spark","tachyon","hive","hbase")) val str=arr.keyBy(word=>word.size) //设置配置方法 str.foreach(println) } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd13_reduce { /** * reduce方法主要是对传入的数据进行合并处理。 * 第一个下划线代表数据集中的第一个参数。 * 第二个下划线在第一次合并处理时代表空集 */ def main(args: Array[String]) { // val conf=new SparkConf() //创建环境变量 // .setMaster("local") //设置本地化处理 // .setAppName("rdd13_reduce") //设置名称 // val sc=new SparkContext(conf) //创建环境变量实例 // val arr=sc.parallelize(Array("hadoop","spark","tachyon","hive","hbase")) // val result=arr.reduce(_+_) // result.foreach(print) /** * 寻找最长字符串 */ val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd13_reduce") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val str=sc.parallelize(Array("hadoop","spark","tachyon","hive")) val result=str.reduce(myFun) //进行数据拟合 result.foreach(print) } def myFun(str1:String,str2:String):String={ var str=str1 if(str2.size>=str.size){ str=str2 } return str } } |
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. */ object rdd14_sortBy { /** * sortBy方法是一个常用的排序方法,其功能是对已有的RDD重新排序,并将重新排序后的数据生成一个新的RDD。 * sortBy有3个参数: * 第一个参数:传入方法,用以计算排序的方法 * 第二个参数:指定排序的值按升序还是降序显示 * 第三个参数:分片的数量 */ def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd14_sortBy") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val str=sc.parallelize(Array((1,"hadoop"),(2,"spark"),(3,"hive"),(4,"hbase"),(5,"tachyon"))) val str1=str.sortBy(word=>word._1,true) //按照第一个数据排序 val str2=str.sortBy(word=>word._2,true) //按照第二个数据排序 str1.foreach(println) str2.foreach(println) } } |
按照第一个数据排序:
按照第二个数据排序:
package RddApi import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2016/4/24. * zip方法是常用的合并压缩算法,它将若干个RDD压缩成一个新的DD,进而形成一系列的键值对存储形式的新RDD。 */ object rdd15_zip { def main(args: Array[String]) { val conf=new SparkConf() //创建环境变量 .setMaster("local") //设置本地化处理 .setAppName("rdd15_zip") //设置名称 val sc=new SparkContext(conf) //创建环境变量实例 val arr1=sc.parallelize(Array(1,2,3,4,5,6)) val arr2=sc.parallelize(Array("a","b","c","d","e","f")) val arr3=sc.parallelize(Array("g","h","i","j","k","l")) val arr4=arr1.zip(arr2).zip(arr3) //进行压缩算法 arr4.foreach(println) } } |
标签:
原文地址:http://blog.csdn.net/baolibin528/article/details/51405415