标签:atm put input min res timeout code 统计 soc
1 package com.bawei.stream 2 3 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 4 import org.apache.spark.streaming.{Seconds, StreamingContext} 5 import org.apache.spark.{SparkConf, SparkContext} 6 7 object StreamWC { 8 9 def main(args: Array[String]): Unit = { 10 //配置sparkConf参数 11 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]") 12 //构建sparkContext对象 13 val sc: SparkContext = new SparkContext(sparkConf) 14 //设置日志输出级别 15 sc.setLogLevel("WARN") 16 //构建StreamingContext对象,每个批处理的时间间隔 17 val scc: StreamingContext = new StreamingContext(sc,Seconds(5)) 18 scc.checkpoint("C:\\Users\\Desktop\\checkpoint") 19 //注册一个监听的IP地址和端口 用来收集数据 20 val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.182.147",9999) 21 //切分每一行记录 22 val words: DStream[String] = lines.flatMap(_.split(" ")) 23 //每个单词记为1 24 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) 25 //分组聚合 26 //val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) 27 //窗口时间,滑动时间 28 //val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5)) 29 //持续更新每个单词出现的次数 36 37 wordAndOne.updateStateByKey((list:Seq[Int],option:Option[Int])=>{// 38 var before = option.getOrElse(0)//获取上一次的累加结果 39 for (value<-list){ 40 before += value 41 } 42 Option(before) 43 }).print() 44 45 46 47 scc.start() 48 scc.awaitTermination() 49 //scc.awaitTerminationOrTimeout(15000) 50 } 51 52 }
标签:atm put input min res timeout code 统计 soc
原文地址:https://www.cnblogs.com/xjqi/p/12831505.html