码迷,mamicode.com
首页 > 其他好文 > 详细

大数据之Spark 模拟数据(本地和kafka方式运行)

时间:2020-06-09 20:34:37      阅读:71      评论:0      收藏:0      [点我收藏+]

标签:int   and   else   摄像头   form   auth   不同的   read   prope   

/**
* 模拟数据 数据格式如下:
*
* 日期 卡口ID 摄像头编号 车牌号 拍摄时间 车速 道路ID 区域ID
* date monitor_id camera_id car action_time speed road_id area_id
*
* monitor_flow_action
* monitor_camera_info
*
* @author Administrator
*/
object MockData {
/**
* 获取n位随机数
*
* @param index 位数
* @param random
* @return
*/
def randomNum(index: Int, random: Random): String = {
var str = ""
for (i <- 0 until index) {
str += random.nextInt(10)
}
str
}

/**
* 时 分 秒 如果小于10,填充0 01——09
* 卡口:0001———0009
* @param random
* @param num 随机范围
* @param index 填充位数
* @return
*/
def fillZero(random: Random, num: Int, index: Int): String = {
val randomNum = random.nextInt(num)
var randomNumStr = randomNum.toString
if (randomNum < 10) {
randomNumStr = ("%0" + index + "d").format(randomNum)
}
randomNumStr
}

/**
* 初始化一个输出流
* @param path
* @return
*/
def initFile(path: String): PrintWriter = {
new PrintWriter(new File(path))
}

/**
* 往文件中写数据
* @param pw
* @param content
*/
def writeDataToFile(pw: PrintWriter, content: String): Unit = {
pw.write(content + "\n")
}

/**
* 关闭文件流
* @param pw
*/
def closeFile(pw: PrintWriter): Unit = {
pw.close()
}

/**
* 初始化kafka生产者
* @return
*/
def initKafkaProducer(): KafkaProducer[String,String] = {
val props = new Properties()
props.put("bootstrap.servers", "wanghy:9092")
props.put("acks", "all")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}

/**
* 写数据到kafka中
* @param producer
* @param content
*/
def writeDataToKafka(producer: KafkaProducer[String,String], content: String): Unit = {
producer.send(new ProducerRecord[String, String]("1711f", content))
}

/**
* 关闭kafka生产者
* @param producer
*/
def closeKafka(producer: KafkaProducer[String, String]): Unit = {
producer.close()
}

/**
* 模拟数据
*/
def mock() {
//初始化文件输出流
// val pw = initFile("d://monitor_action.log")
//初始化kafka生产者
val producer = initKafkaProducer()

val random = new Random()

val locations = Array("鲁", "京", "京", "京", "沪", "京", "京", "深", "京", "京")
//day :如:2020-06-06
val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
/**
* 模拟3000个车辆
*/
for (i <- 0 until 3000) {
//模拟车牌号:如:京A00001
val car = locations(random.nextInt(10)) + (65 + random.nextInt(26)).asInstanceOf[Char] + randomNum(5, random)
//模拟拍照时间 2020-06-06 11
var baseActionTime = day + " " + fillZero(random, 24, 2)

/**
* 这里的for循环模拟每辆车经过不同的卡口不同的摄像头 数据。
*/
for (j <- 0 until random.nextInt(300)) {
//模拟每个车辆每被30个摄像头拍摄后 时间上累计加1小时。这样做使数据更加真实。
if (j % 30 == 0 && j != 0) {
var nextHour = ""
val baseHour = baseActionTime.split(" ")(1)

if (baseHour.startsWith("0")) {
if (baseHour.endsWith("9")) {
nextHour = "10"
} else {
nextHour = "0" + (baseHour.substring(1).toInt + 1).toString
}
} else if (baseHour == "23") {
nextHour = fillZero(random, 24, 2)
} else {
nextHour = (baseHour.toInt + 1).toString
}
baseActionTime = day + " " + nextHour
}
val actionTime = baseActionTime + ":" + fillZero(random, 60, 2) + ":" + fillZero(random, 60, 2)
val monitorId = fillZero(random, 10, 4)
val speed = random.nextInt(200) + 1 //模拟车速 【1-200】
val roadId = random.nextInt(50) + 1 //模拟道路id 【1~50 个道路】
val cameraId = "0" + randomNum(4, random) //5位的摄像头id
val areaId = fillZero(random, random.nextInt(8) + 1, 2) //模拟areaId 【一共8个区域:01-08】

//将数据写入到文件中
val content = day + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId

// writeDataToFile(pw, content)
//发送到kafka
writeDataToKafka(producer, content)

Thread.sleep(50)
}
}
// closeFile(pw)
closeKafka(producer)
}

def main(args: Array[String]): Unit = {
mock()
}
}

大数据之Spark 模拟数据(本地和kafka方式运行)

标签:int   and   else   摄像头   form   auth   不同的   read   prope   

原文地址:https://www.cnblogs.com/whyuan/p/13080134.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!