码迷,mamicode.com
首页 > 其他好文 > 详细

Spark分析sogou日志, RDD

时间:2021-07-02 16:33:14      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:name   java   ugo   lin   rop   mutable   acea   als   array   

import com.hankcs.hanlp.HanLP
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.StringOps
import scala.collection.mutable


object SoGou_WC {
def main(args: Array[String]): Unit = {
//准备环境
// val conf: SparkConf = new SparkConf().setAppName("SouGou_WC").setMaster("local[*]")
val conf: SparkConf = new SparkConf().setAppName("SouGou_WC") //SouGou_WC集群模式
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//读取数据
// val data: RDD[String] = sc.textFile("data/input/SogouQ.sample")
val data: RDD[String] = sc.textFile(args(0))
//封装数据
val sogouDataRDD: RDD[SogouData] = data.map(line => {
val arr: Array[String] = line.split("\\s+")

SogouData(
arr(0),
arr(1),
arr(2),
arr(3).toInt,
arr(4).toInt,
arr(5),
)
})
//处理数据
val res1: Array[(String, Int)] = sogouDataRDD.flatMap(record => {
val str: StringOps = record.data3.replaceAll("\\[|\\]", "")
import scala.collection.JavaConverters._
HanLP.segment(str).asScala.map(_.word)
})
.filter(word => {
if(word.equals("+") || word.equals(".")){
false
}
else
true
})
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)

val res2: Array[((String, String), Int)] = sogouDataRDD.flatMap(record => {
val str: StringOps = record.data3.replaceAll("\\[|\\]", "")
import scala.collection.JavaConverters._
val words: mutable.Buffer[String] = HanLP.segment(str).asScala.map(_.word)
val userid: String = record.data2
words.map(word => (userid, word))
})
.filter(tuple => {
if (tuple._2.equals("+") || tuple._2.equals(".")) {
false
}
else
true
})
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)

val res3: Array[(String, Int)] = sogouDataRDD.map(record => {
val time: String = record.data1
val time1: String = time.substring(0, 5)
(time1, 1)
})
.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(10)

//输出结果数据
res1.foreach(println)
res2.foreach(println)
res3.foreach(println)

sc.makeRDD(res1).coalesce(1,true).saveAsTextFile(args(1))
sc.makeRDD(res2).coalesce(1,true).saveAsTextFile(args(2))
sc.makeRDD(res3).coalesce(1,true).saveAsTextFile(args(3))

// sc.makeRDD(res1).coalesce(1,true).saveAsTextFile("data/output1")
// sc.makeRDD(res2).coalesce(1,true).saveAsTextFile("data/output2")
// sc.makeRDD(res3).coalesce(1,true).saveAsTextFile("data/output3")

// System.setProperty("HADOOP_USER_NAME","node") //权限

//关闭环境
sc.stop()
}
case class SogouData(
data1 : String,
data2 : String,
data3 : String,
data4 : Int,
data5 : Int,
data6 : String
)

}

Spark分析sogou日志, RDD

标签:name   java   ugo   lin   rop   mutable   acea   als   array   

原文地址:https://www.cnblogs.com/tan2022/p/14962765.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!