码迷,mamicode.com
首页 > Windows程序 > 详细

spark基于win上面的操作

时间:2016-12-31 20:43:07      阅读:243      评论:0      收藏:0      [点我收藏+]

标签:cas   分享   import   host   split   cal   res   溢出   result   

  自己前面的小练习一直都是在linux上面写的,可是最近由于要把他迁移到win上面,我在自己的csdn博客有对如何在win上面搭建spark环境做出说明,好了,我们还是先看看

  今天的内容吧

    1.假如你有一个文件,如果你想实现以前的mapReduce的操作,这个时候,如果我们使用spark则会变的非常的简单,如果你此时的文件是以"\t"进行分割的,那我就可以这

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UrlCount").setMaster("local")
    val sc = new SparkContext(conf)

    //rdd1将数据进行切分,元祖中放的是(URL,1)
     val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
      val f = line.split("\t")
      (f(1),1)
    })
      val rdd2 = rdd1.reduceByKey(_+_)
  }
则此时的rdd2,就已经完成了wordCount的操作了

  第一个练习(对一个数组进行循环处理)

package cn.wj.test.spark.day03

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by WJ on 2016/12/30.
  */
object ForeachDemo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ForeachDemo2").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
    rdd1.foreach(println(_))
    sc.stop()
  }
}

  技术分享

 

  2.第二个练习

  

package cn.wj.spark.day02

import java.net.URL

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by WJ on 2016/12/30.
  */
// 这个是以java来进行排序,如果内存过大,可能会出现溢出的操作
object UrlCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UrlCount").setMaster("local")
    val sc = new SparkContext(conf)

    //rdd1将数据进行切分,元祖中放的是(URL,1)
     val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
      val f = line.split("\t")
      (f(1),1)
    })
    val rdd2 = rdd1.reduceByKey(_+_)
    val rdd3 = rdd2.map(t=>{
      val url = t._1
      val host = new URL(url).getHost()
      (host,url,t._2)
    })
//    println(rdd2.collect.toBuffer)
    //这个的操作是,将rdd4的3以host的进行分组,软后并在每一个分组的情况下,以value中的第三个数据进行排序
    //,并且只取前三个的排序
    val rdd4 = rdd3.groupBy(_._1).mapValues(it =>{
      it.toList.sortBy(_._3).reverse.take(3)
    })
    println(rdd4.collect().toBuffer)
  }
}

  技术分享

 

  第三个练习

  

package cn.wj.test.spark.day03

import org.apache.spark.{SparkConf, SparkContext}
import java.net.URL
/**
  * Created by WJ on 2016/12/31.
  */
object AddUrlCount3 {

  val arr = Array("java.itcast.cn","php.itcast.cn","net.itcast.cn")

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("AppUrlCount3").setMaster("local")
    val sc = new SparkContext(conf)

//    val rdd1 = sc.textFile("E://Test/itcast.log").map(line =>{
//       val f = line.split("\t")
//      (f(1),1)
//    })

     val rdd1 = sc.textFile("E://Test/itcast.log").map( line =>{
       val f = line.split("\t")
       (f(1),1)
     })
     val rdd2 = rdd1.reduceByKey(_+_)
    val rdd3 = rdd2.map(t=>{
      val url = t._1
      val host = new URL(url).getHost()
      (host,url,t._2)
    })

    for(ins <- arr){
      val rdd = rdd3.filter(_._1==ins)
      val result = rdd.sortBy(_._3,false).take(3)
      println(result.toBuffer)
    }
    sc.stop()

  }
}

  技术分享

spark基于win上面的操作

标签:cas   分享   import   host   split   cal   res   溢出   result   

原文地址:http://www.cnblogs.com/wnbahmbb/p/6240063.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!