标签:line creat 转化 spl min new col util mem
1 package com.bawei.stream 2 3 import java.net.InetSocketAddress 4 5 import org.apache.spark.storage.StorageLevel 6 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 7 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 8 import org.apache.spark.streaming.{Seconds, StreamingContext} 9 import org.apache.spark.{SparkConf, SparkContext} 10 11 12 object StreamFlume { 13 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 14 val newCount =runningCount.getOrElse(0)+newValues.sum 15 Some(newCount) 16 } 17 18 19 def main(args: Array[String]): Unit = { 20 //配置sparkConf参数 21 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") 22 //构建sparkContext对象 23 val sc: SparkContext = new SparkContext(sparkConf) 24 sc.setLogLevel("WARN") 25 //构建StreamingContext对象,每个批处理的时间间隔 26 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) 27 //设置checkpoint 28 scc.checkpoint("C:\\Users\\Desktop\\checkpoint2") 29 //设置flume的地址,可以设置多台 30 val address=Seq(new InetSocketAddress("192.168.182.147",8888)) 31 // 从flume中拉取数据 32 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK) 33 34 //获取flume中数据,数据存在event的body中,转化为String 35 val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array())) 36 //实现单词汇总 37 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction) 38 39 result.print() 40 scc.start() 41 scc.awaitTermination() 42 } 43 }
标签:line creat 转化 spl min new col util mem
原文地址:https://www.cnblogs.com/xjqi/p/12831494.html