码迷,mamicode.com
首页 > 其他好文 > 详细

Streaming的单词统计

时间:2020-05-05 18:01:51      阅读:55      评论:0      收藏:0      [点我收藏+]

标签:atm   put   input   min   res   timeout   code   统计   soc   

 1 package com.bawei.stream
 2 
 3 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5 import org.apache.spark.{SparkConf, SparkContext}
 6 
 7 object StreamWC {
 8 
 9   def main(args: Array[String]): Unit = {
10     //配置sparkConf参数
11     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")
12     //构建sparkContext对象
13     val sc: SparkContext = new SparkContext(sparkConf)
14     //设置日志输出级别
15     sc.setLogLevel("WARN")
16     //构建StreamingContext对象,每个批处理的时间间隔
17     val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
18     scc.checkpoint("C:\\Users\\Desktop\\checkpoint")
19     //注册一个监听的IP地址和端口  用来收集数据
20     val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.182.147",9999)
21     //切分每一行记录
22     val words: DStream[String] = lines.flatMap(_.split(" "))
23     //每个单词记为1
24     val wordAndOne: DStream[(String, Int)] = words.map((_,1))
25     //分组聚合
26     //val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
27     //窗口时间,滑动时间
28     //val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
29     //持续更新每个单词出现的次数
36 
37     wordAndOne.updateStateByKey((list:Seq[Int],option:Option[Int])=>{//
38       var before = option.getOrElse(0)//获取上一次的累加结果
39       for (value<-list){
40         before += value
41       }
42       Option(before)
43     }).print()
44 
45 
46 
47     scc.start()
48     scc.awaitTermination()
49     //scc.awaitTerminationOrTimeout(15000)
50   }
51 
52 }

 

Streaming的单词统计

标签:atm   put   input   min   res   timeout   code   统计   soc   

原文地址:https://www.cnblogs.com/xjqi/p/12831505.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!