标签:cas 分享 import host split cal res 溢出 result
自己前面的小练习一直都是在linux上面写的,可是最近由于要把他迁移到win上面,我在自己的csdn博客有对如何在win上面搭建spark环境做出说明,好了,我们还是先看看
今天的内容吧
1.假如你有一个文件,如果你想实现以前的mapReduce的操作,这个时候,如果我们使用spark则会变的非常的简单,如果你此时的文件是以"\t"进行分割的,那我就可以这
def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("UrlCount").setMaster("local") val sc = new SparkContext(conf) //rdd1将数据进行切分,元祖中放的是(URL,1) val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{ val f = line.split("\t") (f(1),1) }) val rdd2 = rdd1.reduceByKey(_+_) } 则此时的rdd2,就已经完成了wordCount的操作了
第一个练习(对一个数组进行循环处理)
package cn.wj.test.spark.day03 import org.apache.spark.{SparkConf, SparkContext} /** * Created by WJ on 2016/12/30. */ object ForeachDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ForeachDemo2").setMaster("local[3]") val sc = new SparkContext(conf) val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) rdd1.foreach(println(_)) sc.stop() } }
2.第二个练习
package cn.wj.spark.day02 import java.net.URL import org.apache.spark.{SparkConf, SparkContext} /** * Created by WJ on 2016/12/30. */ // 这个是以java来进行排序,如果内存过大,可能会出现溢出的操作 object UrlCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("UrlCount").setMaster("local") val sc = new SparkContext(conf) //rdd1将数据进行切分,元祖中放的是(URL,1) val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{ val f = line.split("\t") (f(1),1) }) val rdd2 = rdd1.reduceByKey(_+_) val rdd3 = rdd2.map(t=>{ val url = t._1 val host = new URL(url).getHost() (host,url,t._2) }) // println(rdd2.collect.toBuffer) //这个的操作是,将rdd4的3以host的进行分组,软后并在每一个分组的情况下,以value中的第三个数据进行排序 //,并且只取前三个的排序 val rdd4 = rdd3.groupBy(_._1).mapValues(it =>{ it.toList.sortBy(_._3).reverse.take(3) }) println(rdd4.collect().toBuffer) } }
第三个练习
package cn.wj.test.spark.day03 import org.apache.spark.{SparkConf, SparkContext} import java.net.URL /** * Created by WJ on 2016/12/31. */ object AddUrlCount3 { val arr = Array("java.itcast.cn","php.itcast.cn","net.itcast.cn") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("AppUrlCount3").setMaster("local") val sc = new SparkContext(conf) // val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{ // val f = line.split("\t") // (f(1),1) // }) val rdd1 = sc.textFile("E://Test/itcast.log").map( line =>{ val f = line.split("\t") (f(1),1) }) val rdd2 = rdd1.reduceByKey(_+_) val rdd3 = rdd2.map(t=>{ val url = t._1 val host = new URL(url).getHost() (host,url,t._2) }) for(ins <- arr){ val rdd = rdd3.filter(_._1==ins) val result = rdd.sortBy(_._3,false).take(3) println(result.toBuffer) } sc.stop() } }
标签:cas 分享 import host split cal res 溢出 result
原文地址:http://www.cnblogs.com/wnbahmbb/p/6240063.html