码迷,mamicode.com
首页 > 其他好文 > 详细

Stream From 整合 Kafka

时间:2020-05-05 17:51:00      阅读:59      评论:0      收藏:0      [点我收藏+]

标签:seconds   split   meta   color   consumer   location   ons   put   ams   

 1 package com.bawei.stream
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.log4j.{Level, Logger}
 6 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 7 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 8 import org.apache.spark.streaming.kafka010.KafkaUtils
 9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
10 import org.apache.spark.streaming.{Seconds, StreamingContext}
11 import org.apache.spark.{SparkConf, SparkContext}
12 
13 
14 object StreamFromKafka {
15   def updateFunc(a: Seq[Int], b: Option[Int]): Option[Int] = {
16     Some(a.sum + b.getOrElse(0))
17   }
18 
19   def main(args: Array[String]): Unit = {
20 
21     val checkpointPath = "./kafka-direct"
22 
23     val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
24       createFunc(checkpointPath)
25     })
26     ssc.start()
27     ssc.awaitTermination()
28   }
29   def createFunc(checkpointPath:String): StreamingContext = {
30     //todo:1、创建sparkConf
31     val sparkConf: SparkConf = new SparkConf()
32       .setAppName("SparkStreamingKafka_direct_checkpoint")
33       .setMaster("local[4]")
34 
35     //todo:2、创建sparkContext
36     val sc = new SparkContext(sparkConf)
37 
38     sc.setLogLevel("WARN")
39     //Logger.getLogger("org").setLevel(Level.ERROR)
40     //todo:3、创建StreamingContext
41     val ssc = new StreamingContext(sc, Seconds(5))
42     ssc.checkpoint(checkpointPath)
43     //todo:4、kafka的参数配置
44     /*val kafkaParams=Map("metadata.broker.list" ->"node1:9092,node2:9092,node3:9092"
45       ,"group.id" -> "kafka-direct01")*/
46 
47     val kafkaParams = Map[String, Object](
48       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
49       "key.deserializer" -> classOf[StringDeserializer],
50       "value.deserializer" -> classOf[StringDeserializer],
51       "group.id" -> "group1"
52     )
53 
54     //todo:5、定义一个topics ,是一个集合,可以存放多个topic
55     val topics=Set("test")
56 
57     //todo:6、利用KafkaUtils.createDirectStream构建Dstream
58     //val kafkaTopicDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
59      val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
60     //todo:7、获取kafka中topic的数据
61     val kafkaData: DStream[String] = kafkaTopicDS.map(x=>x.value())
62 
63     //todo:8、切分每一行,每个单词记为1
64     val wordAndOne: DStream[(String, Int)] = kafkaData.flatMap(_.split(" ")).map((_,1))
65 
66     //todo:9、相同单词出现次数累加
67     val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
68 
69     //todo:打印
70     result.print()
71     ssc
72   }
73 }

 

Stream From 整合 Kafka

标签:seconds   split   meta   color   consumer   location   ons   put   ams   

原文地址:https://www.cnblogs.com/xjqi/p/12831521.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!