标签:class 迭代 param span enc 填充 conf github 变量
Spark下载,版本随意,下载后解压放入bigdata下(目录可以更改)
同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本人就有强迫症~~,文件下载后放入bigdata\hadoop\bin目录下。
不用创建环境变量,再Java最开始处定义系统变量即可,如下:
System.setProperty("hadoop.home.dir", HADOOP_HOME);
建立相关目录层次如下:
父级目录(项目所在目录)
- java-spark-sql-excel
- bigdata
- spark
- hadoop
- bin
- winutils.exe
static{ System.setProperty("hadoop.home.dir", HADOOP_HOME); spark = SparkSession.builder() .appName("test") .master("local[*]") .config("spark.sql.warehouse.dir",SPARK_HOME) .config("spark.sql.parquet.binaryAsString", "true") .getOrCreate(); }
public static void readExcel(String filePath,String tableName) throws IOException{ DecimalFormat format = new DecimalFormat(); format.applyPattern("#"); //创建文件(可以接收上传的文件,springmvc使用CommonsMultipartFile,jersey可以使用org.glassfish.jersey.media.multipart.FormDataParam(参照本人文件上传博客)) File file = new File(filePath); //创建文件流 InputStream inputStream = new FileInputStream(file); //创建流的缓冲区 BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream); //定义Excel workbook引用 Workbook workbook =null; //.xlsx格式的文件使用XSSFWorkbook子类,xls格式的文件使用HSSFWorkbook if(file.getName().contains("xlsx")) workbook = new XSSFWorkbook(bufferedInputStream); if(file.getName().contains("xls")&&!file.getName().contains("xlsx")) workbook = new HSSFWorkbook(bufferedInputStream); System.out.println(file.getName()); //获取Sheets迭代器 Iterator<Sheet> dataTypeSheets= workbook.sheetIterator(); while(dataTypeSheets.hasNext()){ //每一个sheet都是一个表,为每个sheet ArrayList<String> schemaList = new ArrayList<String>(); // dataList数据集 ArrayList<org.apache.spark.sql.Row> dataList = new ArrayList<org.apache.spark.sql.Row>(); //字段 List<StructField> fields = new ArrayList<>(); //获取当前sheet Sheet dataTypeSheet = dataTypeSheets.next(); //获取第一行作为字段 Iterator<Row> iterator = dataTypeSheet.iterator(); //没有下一个sheet跳过 if(!iterator.hasNext()) continue; //获取第一行用于建立表结构 Iterator<Cell> firstRowCellIterator = iterator.next().iterator(); while(firstRowCellIterator.hasNext()){ //获取第一行每一列作为字段 Cell currentCell = firstRowCellIterator.next(); //字符串 if(currentCell.getCellTypeEnum() == CellType.STRING) schemaList.add(currentCell.getStringCellValue().trim()); //数值 if(currentCell.getCellTypeEnum() == CellType.NUMERIC) schemaList.add((currentCell.getNumericCellValue()+"").trim()); } //创建StructField(spark中的字段对象,需要提供字段名,字段类型,第三个参数true表示列可以为空)并填充List<StructField> for (String fieldName : schemaList) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } //根据List<StructField>创建spark表结构org.apache.spark.sql.types.StructType StructType schema = DataTypes.createStructType(fields); //字段数len int len = schemaList.size(); //获取当前sheet数据行数 int rowEnd = dataTypeSheet.getLastRowNum(); //遍历当前sheet所有行 for (int rowNum = 1; rowNum <= rowEnd; rowNum++) { //一行数据做成一个List ArrayList<String> rowDataList = new ArrayList<String>(); //获取一行数据 Row r = dataTypeSheet.getRow(rowNum); if(r!=null){ //根据字段数遍历当前行的单元格 for (int cn = 0; cn < len; cn++) { Cell c = r.getCell(cn, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL); if (c == null) rowDataList.add("0");//空值简单补零 if (c != null&&c.getCellTypeEnum() == CellType.STRING) rowDataList.add(c.getStringCellValue().trim());//字符串 if (c != null&&c.getCellTypeEnum() == CellType.NUMERIC){ double value = c.getNumericCellValue(); if (p.matcher(value+"").matches()) rowDataList.add(format.format(value));//不保留小数点 if (!p.matcher(value+"").matches()) rowDataList.add(value+"");//保留小数点 } } } //dataList数据集添加一行 dataList.add(RowFactory.create(rowDataList.toArray())); } //根据数据和表结构创建临时表 spark.createDataFrame(dataList, schema).createOrReplaceTempView(tableName+dataTypeSheet.getSheetName()); } }
第一个Sheet:
第二个Sheet:
第三个Sheet:
public static void main(String[] args) throws Exception { //需要查询的excel路径 String xlsxPath = "test2.xlsx"; String xlsPath = "test.xls"; //定义表名 String tableName1="test_table1"; String tableName2="test_table2"; //读取excel表名为tableNameN+Sheet的名称 readExcel(xlsxPath,tableName2); spark.sql("select * from "+tableName2+"Sheet1").show(); readExcel(xlsPath,tableName1); spark.sql("select * from "+tableName1+"Sheet1").show(); spark.sql("select * from "+tableName1+"Sheet2").show(); spark.sql("select * from "+tableName1+"Sheet3").show(); }
<dependencies> <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1.spark2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.17</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.17</version> </dependency> </dependencies>
标签:class 迭代 param span enc 填充 conf github 变量
原文地址:https://www.cnblogs.com/shenyuchong/p/10291604.html