标签:时间 tar int() 拆分 input org etl 打印 结果
1 题目二:模拟实时获取监控得到的车牌数据,并对车牌信息进行处理(共计100分) 2 车牌信息(可自行构造): 3 京R916HA 沪A88888 京CDQ823 苏E799QV 4 贵A7W79N 冀J7CL78 京CFC828 京AL250E 5 浙A972C5 京S3K30E 京CR8106 京J7CL88 6 7 8 1)使用kafka生产发送车牌号码 9 2)创建SparkStream相应对象获取车牌信息 10 3)当车牌为京牌时不处理,将非京牌号码过滤出来 11 4)将过滤结果(非京牌号码)输出到文件中 12 5)在题2)基础上对车流量进行统计,每隔3分钟统计前5分钟的车流量(总数) (请注意模拟测试时的时间间隔) 13 6)打印出题5)的一个统计结果 14 15 注意:添加代码注释
1 package com.bawei.review01 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord 4 import org.apache.kafka.common.serialization.StringDeserializer 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream} 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 7 import org.apache.spark.streaming.kafka010.KafkaUtils 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 9 import org.apache.spark.{SparkConf, SparkContext} 10 import org.apache.spark.streaming.{Seconds, StreamingContext} 11 12 13 object SparkStreamingZY { 14 15 def main(args: Array[String]): Unit = { 16 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingZY").setMaster("local[2]") 17 val sparkContext = new SparkContext(sparkConf) 18 19 sparkContext.setLogLevel("WARN") 20 val ssc = new StreamingContext(sparkContext,Seconds(1)) 21 22 //todo:从kafka获取到数据 23 val kafkaParams = Map[String, Object]( 24 "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092", 25 "key.deserializer" -> classOf[StringDeserializer], 26 "value.deserializer" -> classOf[StringDeserializer], 27 "group.id" -> "group1" 28 ) 29 //5、定义一个topics ,是一个集合,可以存放多个topic 30 val topics=Set("test") 31 //6、利用KafkaUtils.createDirectStream构建Dstream 32 val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)) 33 //todo:从kafka返回的数据里提取出行数据封装成DStream据 34 val lineDs: DStream[String] = kafkaTopicDS.map(x=>x.value()) 35 //todo:lineDs 按照\t拆分车牌,获取车牌的第一个字,如果不是“京”,则将其打印输出 36 //lineDs.map(x=>x.split("\t").startsWith("京")) 37 38 //lineDs.flatMap(_.split("\t")).filter(!_.startsWith("京")).saveAsTextFiles("./carno") 39 val caoDs: DStream[(String, Int)] = lineDs.flatMap(_.split("\t")).map((a:String)=>("数量",1)) 40 caoDs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(5),Seconds(3)).print() 41 42 ssc.start() 43 ssc.awaitTermination() 44 } 45 }
标签:时间 tar int() 拆分 input org etl 打印 结果
原文地址:https://www.cnblogs.com/xjqi/p/12831451.html