bin/spark-shell --master spark://hadoop1:7077 --executor-memory 3g
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._
//RDD演示 case class Person(name:String,age:Int) val rddpeople=sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)) rddpeople.registerAsTable("rddTable") sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)运行结果:
rddpeople.saveAsParquetFile("/sparksql/people.parquet")
//parquet演示 val parquetpeople = sqlContext.parquetFile("/sparksql/people.parquet") parquetpeople.registerAsTable("parquetTable") sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)运行结果:
//json演示 val jsonpeople = sqlContext.jsonFile("/sparksql/people.json") jsonpeople.registerAsTable("jsonTable") sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)运行结果:
</pre><img src="http://img.blog.csdn.net/20140910090615849?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYm9va19tbWlja3k=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="" /></div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;">2:hiveContext基础应用</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;"> 使用hiveContext之前首先要确认以下两点:</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;"><ul style="margin: 5px 0px 5px 40px; padding: 0px;"><li>使用的Spark是支持hive</li><li>hive的配置文件hive-site.xml已经存在conf目录中</li></ul> 前者可以查看lib目录下是否存在以datanucleus开头的3个JAR来确定,后者注意是否在hive-site.xml里配置了uris来访问hive metastore。</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;"><div></div><div>要使用hiveContext,需要先构建hiveContext:</div><div><pre name="code" class="html">val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)然后就可以对hive数据进行操作了,下面我们将使用hive中的销售数据(第五小结中的hive数据),首先切换数据库到saledata并查看有几个表:
hiveContext.sql("use saledata") hiveContext.sql("show tables").collect().foreach(println)可以看到有在第五小节定义的3个表:
//所有订单中每年的销售单数、销售总额 //三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额 hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)运行结果:
/************************ 所有订单每年最大金额订单的销售额: 第一步,先求出每份订单的销售额以其发生时间 select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber 第二步,以第一步的查询作为子表,和表tblDate连接,求出每年最大金额订单的销售额 select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear *************************/ hiveContext.sql("select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)运行结果:
/************************ 所有订单中每年最畅销货品: 第一步:求出每年每个货品的销售金额 select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid 第二步:求出每年单品销售的最大金额 select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear 第三步:求出每年与销售额最大相符的货品就是最畅销货品 select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear *************************/ hiveContext.sql("select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)运行结果:
//sqlContext中混合使用 //sqlContext中来自rdd的表rddTable和来自parquet文件的表parquetTable混合使用 sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)运行结果:
//hiveContext中混合使用 //创建一个hiveTable,并将数据加载,注意people.txt第二列有空格,所以age取string类型 hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ") hiveContext.sql("LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/people.txt' INTO TABLE hiveTable") //创建一个源自parquet文件的表parquetTable2,然后和hiveTable混合使用 hiveContext.parquetFile("/sparksql/people.parquet").registerAsTable("parquetTable2") hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)运行结果:
//sqlContext的cache使用 sqlContext.cacheTable("rddTable") sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println) sqlContext.sql("CACHE TABLE parquetTable") sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)观察webUI,可以看到cache的信息。(注意cache是lazy的,要有action才会实现;uncache是eager的,可以立即实现)
sqlContext.uncacheTable("rddTable") sqlContext.sql("UNCACHE TABLE parquetTable")同样的,在hiveContext也可以使用上面的方法cache或uncache。
//DSL演示 val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name) teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)
原文地址:http://blog.csdn.net/book_mmicky/article/details/39177041