标签:
CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table ( retCode string, retMsg string, data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>) ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe‘ LOCATION ‘hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/‘; create table if not exists finance.tb_optd as SELECT b.data.secID, b.data.tradeDate, b.data.optID, b.data.ticker, b.data.secShortName, b.data.exchangeCD, b.data.preSettlePrice, b.data.preClosePrice, b.data.openPrice, b.data.highestPrice, b.data.lowestPrice, b.data.closePrice, b.data.settlPrice, b.data.turnoverVol, b.data.turnoverValue, b.data.openInt FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
?
?
%dep z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");
?
// 定义导入的hive对象集合 case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String); var hiveConfigList = List[HiveConfig]();
?
// 创建equd数据结构 // 定义json结构 val schema_json_equd_serde =""" retCode string, retMsg string, data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>"""; var schema_equd ="""b.data.secID, b.data.ticker, b.data.secShortName, b.data.exchangeCD, b.data.tradeDate, b.data.preClosePrice, b.data.actPreClosePrice, b.data.openPrice, b.data.highestPrice, b.data.lowestPrice, b.data.closePrice, b.data.turnoverVol, b.data.turnoverValue, b.data.dealAmount, b.data.turnoverRate, b.data.accumAdjFactor, b.data.negMarketValue, b.data.marketValue, b.data.PE, b.data.PE1, b.data.PB, b.data.isOpen"""; hiveConfigList = hiveConfigList :+ HiveConfig("finance", "equd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);
?
// 创建idxd数据结构 // 定义json结构 val schema_json_idxd_serde =""" retCode string, retMsg string, data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>"""; var schema_idxd ="""b.data.indexID, b.data.tradeDate, b.data.ticker, b.data.porgFullName, b.data.secShortName, b.data.exchangeCD, b.data.preCloseIndex, b.data.openIndex, b.data.lowestIndex, b.data.highestIndex, b.data.closeIndex, b.data.turnoverVol, b.data.turnoverValue, b.data.CHG, b.data.CHGPct"""; hiveConfigList = hiveConfigList :+ HiveConfig("finance", "idxd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);
?
// 循环加载数据中 def loadDataToHive(args:HiveConfig){ val loadPath = args.hdfsPath + args.modelName; val tb_json_serde = "json_serde_" + args.modelName +"_table"; val tb= "tb_" + args.modelName; val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) if(args.database != "" && args.schema != "") { print("正在创建项目..." + args.modelName) hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database); print("正在构造扩展模型..."); hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde ‘org.apache.hive.hcatalog.data.JsonSerDe‘ LOCATION " + "‘" + loadPath + "/‘"); println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data"); hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data"); println(args.modelName + " 扩展模型加载已完成!"); } } hiveConfigList.size; hiveConfigList.foreach { x => loadDataToHive(x) };
?
?
?
标签:
原文地址:http://www.cnblogs.com/fengwenit/p/5624292.html