码迷,mamicode.com
首页 > 其他好文 > 详细

SparkStreaming进行 生产消费的单词统计

时间:2020-05-05 17:47:17      阅读:55      评论:0      收藏:0      [点我收藏+]

标签:NPU   ext   soc   contex   stream   pack   构建   apache   bootstrap   

 1 package com.bawei.review01
 2 
 3 import java.net.InetSocketAddress
 4 
 5 import org.apache.kafka.clients.consumer.ConsumerRecord
 6 import org.apache.kafka.common.serialization.StringDeserializer
 7 import org.apache.spark.storage.StorageLevel
 8 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
 9 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
10 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
11 import org.apache.spark.streaming.kafka010.KafkaUtils
12 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
13 import org.apache.spark.streaming.{Seconds, StreamingContext}
14 import org.apache.spark.{SparkConf, SparkContext}
15 
16 
17 object SparkStreamingWC {
18   def main(args: Array[String]): Unit = {
19 
20     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingWC").setMaster("local[2]")
21     val sparkContext = new SparkContext(sparkConf)
22 
23     sparkContext.setLogLevel("WARN")
24     val ssc = new StreamingContext(sparkContext,Seconds(5))
25 
26 
27 
28     val kafkaParams = Map[String, Object](
29       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
30       "key.deserializer" -> classOf[StringDeserializer],
31       "value.deserializer" -> classOf[StringDeserializer],
32       "group.id" -> "group1"
33     )
34     //5、定义一个topics ,是一个集合,可以存放多个topic
35     val topics=Set("test")
36     //6、利用KafkaUtils.createDirectStream构建Dstream
37     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
38     //获取kafka中topic的数据
39     val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value())
40 
41     val dataDS: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1))
42 
43 
44     dataDS.reduceByKey(_+_).print()   ///(kello,3),  (hello,2)
45 
46 
47     ssc.start()
48     ssc.awaitTermination()
49   }
50 
51 }

 

SparkStreaming进行 生产消费的单词统计

标签:NPU   ext   soc   contex   stream   pack   构建   apache   bootstrap   

原文地址:https://www.cnblogs.com/xjqi/p/12831484.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!