标签:seconds split meta color consumer location ons put ams
1 package com.bawei.stream 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord 4 import org.apache.kafka.common.serialization.StringDeserializer 5 import org.apache.log4j.{Level, Logger} 6 import org.apache.spark.streaming.dstream.{DStream, InputDStream} 7 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 8 import org.apache.spark.streaming.kafka010.KafkaUtils 9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 10 import org.apache.spark.streaming.{Seconds, StreamingContext} 11 import org.apache.spark.{SparkConf, SparkContext} 12 13 14 object StreamFromKafka { 15 def updateFunc(a: Seq[Int], b: Option[Int]): Option[Int] = { 16 Some(a.sum + b.getOrElse(0)) 17 } 18 19 def main(args: Array[String]): Unit = { 20 21 val checkpointPath = "./kafka-direct" 22 23 val ssc = StreamingContext.getOrCreate(checkpointPath, () => { 24 createFunc(checkpointPath) 25 }) 26 ssc.start() 27 ssc.awaitTermination() 28 } 29 def createFunc(checkpointPath:String): StreamingContext = { 30 //todo:1、创建sparkConf 31 val sparkConf: SparkConf = new SparkConf() 32 .setAppName("SparkStreamingKafka_direct_checkpoint") 33 .setMaster("local[4]") 34 35 //todo:2、创建sparkContext 36 val sc = new SparkContext(sparkConf) 37 38 sc.setLogLevel("WARN") 39 //Logger.getLogger("org").setLevel(Level.ERROR) 40 //todo:3、创建StreamingContext 41 val ssc = new StreamingContext(sc, Seconds(5)) 42 ssc.checkpoint(checkpointPath) 43 //todo:4、kafka的参数配置 44 /*val kafkaParams=Map("metadata.broker.list" ->"node1:9092,node2:9092,node3:9092" 45 ,"group.id" -> "kafka-direct01")*/ 46 47 val kafkaParams = Map[String, Object]( 48 "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092", 49 "key.deserializer" -> classOf[StringDeserializer], 50 "value.deserializer" -> classOf[StringDeserializer], 51 "group.id" -> "group1" 52 ) 53 54 //todo:5、定义一个topics ,是一个集合,可以存放多个topic 55 val topics=Set("test") 56 57 //todo:6、利用KafkaUtils.createDirectStream构建Dstream 58 //val kafkaTopicDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) 59 val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)) 60 //todo:7、获取kafka中topic的数据 61 val kafkaData: DStream[String] = kafkaTopicDS.map(x=>x.value()) 62 63 //todo:8、切分每一行,每个单词记为1 64 val wordAndOne: DStream[(String, Int)] = kafkaData.flatMap(_.split(" ")).map((_,1)) 65 66 //todo:9、相同单词出现次数累加 67 val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) 68 69 //todo:打印 70 result.print() 71 ssc 72 } 73 }
标签:seconds split meta color consumer location ons put ams
原文地址:https://www.cnblogs.com/xjqi/p/12831521.html