标签:day gis field size function schema mock reg rgs
package spark.action.factory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.*;
/**
*
* @author 雪瞳
* @Slogan 时钟尚且前行,人怎能就此止步!
* @Function 模拟数据并创建DataFrame
*
*/
public class MockData {
public static void main(String[] args) {
String master = "local";
String appname = "dataFrame";
SparkSession session = SparkSession.builder().master(master).appName(appname).getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext());
List<Row> dataList = new ArrayList<>();
Random random = new Random();
String[] locations = new String[]{"鲁","京","冀","鄂","粤","沪","京","深","蒙","川"};
String date = DateUtils.getTodayDate();
for (int i=0 ; i < 3000 ; i++){
//车牌号
String car = locations[random.nextInt(locations.length)]+
(char)(65+random.nextInt(26))+
StringUtils.fullFillNumBites(5,String.valueOf(random.nextInt(10000)));
//模拟24小时 yyyyMMdd HH
String baseActionTime = date+" "+
StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(24)));
//模拟一辆车被多少摄像头拍摄
for (int j=0; j< random.nextInt(300)+1 ;j++){
//每30个摄像头 小时+1
if (j % 30==0 && j!=0){
int tmp = Integer.parseInt(
baseActionTime.split(" ")[1]) + 1;
baseActionTime = date+ " "+
StringUtils.fullFillTwoBites(String.valueOf(tmp));
}
//模拟区域ID 1-8
String areaId = StringUtils.fullFillNumBites(2,
String.valueOf(random.nextInt(8)+1));
//模拟道路ID 1-50
String roadId = String.valueOf(random.nextInt(50)+1);
//模拟路口数
String monitorId = StringUtils.fullFillNumBites(4,
String.valueOf(random.nextInt(9)+1));
//模拟车辆被多少个摄像头拍摄
String cameraId = StringUtils.fullFillNumBites(5,
String.valueOf(random.nextInt(100000)+1));
//模拟经过此路口开始时间 ,如:2018-01-01 20:09:10
String actionTime = baseActionTime+
StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)))+
StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)));
//模拟车速
String speed = String.valueOf(random.nextInt(260)+1);
//
Row row = RowFactory.create(date, monitorId, cameraId, car, actionTime, speed, roadId, areaId);
dataList.add(row);
}
}
//将list序列化成row类型的javaRDD
JavaRDD<Row> rowJavaRDD = jsc.parallelize(dataList);
//动态创建schema方式创建DataFrame
StructType structType = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
DataTypes.createStructField("camera_id", DataTypes.StringType, true),
DataTypes.createStructField("car", DataTypes.StringType, true),
DataTypes.createStructField("action_time", DataTypes.StringType, true),
DataTypes.createStructField("speed", DataTypes.StringType, true),
DataTypes.createStructField("road_id", DataTypes.StringType, true),
DataTypes.createStructField("area_id", DataTypes.StringType, true)
));
//创建DataFrame
Dataset<Row> dataFrame = session.createDataFrame(rowJavaRDD, structType);
//打印数据
System.err.println("车辆信息数据");
dataFrame.show(50);
dataFrame.registerTempTable("monitor_flow_action");
//生成路口号与摄像头的对应表
Map<String,Set<String>> monitorAndCameras = new HashMap<>();
int index = 0;
for (Row row : dataList){
String monitorId = row.getString(1);
Set<String> sets = monitorAndCameras.get(monitorId);
if (sets == null){
sets = new HashSet<>();
monitorAndCameras.put(monitorId,sets);
}
index ++;
if (index % 1000 == 0){
sets.add(StringUtils.fullFillNumBites(5,
String.valueOf(random.nextInt(100000))));
}
String cameraId = row.getString(2);
sets.add(cameraId);
}
//创建路口号与摄像头对应的dataFrame
dataList.clear();
Set<Map.Entry<String, Set<String>>> entrySet = monitorAndCameras.entrySet();
for (Map.Entry<String, Set<String>> entry:entrySet){
String monitorId = entry.getKey();
Set<String> cameraIds = entry.getValue();
Row row = null;
for (String cameraId : cameraIds){
row = RowFactory.create(monitorId,cameraId);
dataList.add(row);
}
}
//动态创建schema
StructType structTypeTwo = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
DataTypes.createStructField("camera_id", DataTypes.StringType, true)
));
JavaRDD<Row> parallelize = jsc.parallelize(dataList);
Dataset<Row> dataFrameTwo = session.createDataFrame(parallelize, structTypeTwo);
dataFrameTwo.registerTempTable("monitor_camera_info");
System.err.println("路口与摄像头");
dataFrameTwo.show(50);
}
}
标签:day gis field size function schema mock reg rgs
原文地址:https://www.cnblogs.com/walxt/p/12853054.html