码迷,mamicode.com
首页 > 其他好文 > 详细

模拟实时获取监控得到的车牌数据,并对车牌信息进行处理

时间:2020-05-05 18:17:57      阅读:94      评论:0      收藏:0      [点我收藏+]

标签:时间   tar   int()   拆分   input   org   etl   打印   结果   

 1     题目二:模拟实时获取监控得到的车牌数据,并对车牌信息进行处理(共计100分)
 2 车牌信息(可自行构造):
 3 京R916HA    沪A88888    京CDQ823    苏E799QV
 4 贵A7W79N    冀J7CL78    京CFC828    京AL250E
 5 浙A972C5    京S3K30E    京CR8106    京J7CL88
 6 
 7     
 8 1)使用kafka生产发送车牌号码
 9 2)创建SparkStream相应对象获取车牌信息
10 3)当车牌为京牌时不处理,将非京牌号码过滤出来        
11         4)将过滤结果(非京牌号码)输出到文件中
12         5)在题2)基础上对车流量进行统计,每隔3分钟统计前5分钟的车流量(总数)          (请注意模拟测试时的时间间隔)
13         6)打印出题5)的一个统计结果
14 
15      注意:添加代码注释
 1 package com.bawei.review01
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 7 import org.apache.spark.streaming.kafka010.KafkaUtils
 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 9 import org.apache.spark.{SparkConf, SparkContext}
10 import org.apache.spark.streaming.{Seconds, StreamingContext}
11 
12 
13 object SparkStreamingZY {
14 
15   def main(args: Array[String]): Unit = {
16     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingZY").setMaster("local[2]")
17     val sparkContext = new SparkContext(sparkConf)
18 
19     sparkContext.setLogLevel("WARN")
20     val ssc = new StreamingContext(sparkContext,Seconds(1))
21 
22     //todo:从kafka获取到数据
23     val kafkaParams = Map[String, Object](
24       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
25       "key.deserializer" -> classOf[StringDeserializer],
26       "value.deserializer" -> classOf[StringDeserializer],
27       "group.id" -> "group1"
28     )
29     //5、定义一个topics ,是一个集合,可以存放多个topic
30     val topics=Set("test")
31     //6、利用KafkaUtils.createDirectStream构建Dstream
32     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
33     //todo:从kafka返回的数据里提取出行数据封装成DStream据
34     val lineDs: DStream[String] = kafkaTopicDS.map(x=>x.value())
35     //todo:lineDs 按照\t拆分车牌,获取车牌的第一个字,如果不是“京”,则将其打印输出
36     //lineDs.map(x=>x.split("\t").startsWith("京"))
37     
38     //lineDs.flatMap(_.split("\t")).filter(!_.startsWith("京")).saveAsTextFiles("./carno")
39     val caoDs: DStream[(String, Int)] = lineDs.flatMap(_.split("\t")).map((a:String)=>("数量",1))
40     caoDs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(5),Seconds(3)).print()
41 
42     ssc.start()
43     ssc.awaitTermination()
44   }
45 }

 

模拟实时获取监控得到的车牌数据,并对车牌信息进行处理

标签:时间   tar   int()   拆分   input   org   etl   打印   结果   

原文地址:https://www.cnblogs.com/xjqi/p/12831451.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!