ant clean package -Dhadoop.version=2.2.0 -Dhadoop-0.23.version=2.2.0 -Dhadoop.mr.rev=23
export HIVE_HOME=/app/hadoop/hive012/src/build/dist export HIVE_DEV_HOME=/app/hadoop/hive012/src export HADOOP_HOME=/app/hadoop/hadoop220D:启动
sbt/sbt hive/console
/*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */ // The test tables that are defined in the Hive QTestUtil. // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java val hiveQTestUtilTables = Seq( TestTable("src", "CREATE TABLE src (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd), TestTable("src1", "CREATE TABLE src1 (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), TestTable("srcpart", () => { runSqlHive( "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { runSqlHive( s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') """.stripMargin) } }), ...... )因为要使用hive0.12的测试数据,所以需要定义两个环境变量:HIVE_HOME和HIVE_DEV_HOME,如果使用hive0.13的话,用户需要更改到相应目录:
/*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */ /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") /** The location of the hive source code. */ lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")
/* 源自 project/SparkBuild.scala */ object Hive { lazy val settings = Seq( javaOptions += "-XX:MaxPermSize=1g", // Multiple queries rely on the TestHive singleton. See comments there for more details. parallelExecution in Test := false, // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings // only for this subproject. scalacOptions <<= scalacOptions map { currentOpts: Seq[String] => currentOpts.filterNot(_ == "-deprecation") }, initialCommands in console := """ |import org.apache.spark.sql.catalyst.analysis._ |import org.apache.spark.sql.catalyst.dsl._ |import org.apache.spark.sql.catalyst.errors._ |import org.apache.spark.sql.catalyst.expressions._ |import org.apache.spark.sql.catalyst.plans.logical._ |import org.apache.spark.sql.catalyst.rules._ |import org.apache.spark.sql.catalyst.types._ |import org.apache.spark.sql.catalyst.util._ |import org.apache.spark.sql.execution |import org.apache.spark.sql.hive._ |import org.apache.spark.sql.hive.test.TestHive._ |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin ) }
//在控制台逐行运行 case class Person(name:String, age:Int, state:String) sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people") val query= sql("select * from people")
query.printSchema
query.queryExecution
query.queryExecution.logical
query.queryExecution.analyzed
query.queryExecution.optimizedPlan
query.queryExecution.sparkPlan
query.toDebugString
{ "fullname": "Sean Kelly", "org": "SK Consulting", "emailaddrs": [ {"type": "work", "value": "kelly@seankelly.biz"}, {"type": "home", "pref": 1, "value": "kelly@seankelly.tv"} ], "telephones": [ {"type": "work", "pref": 1, "value": "+1 214 555 1212"}, {"type": "fax", "value": "+1 214 555 1213"}, {"type": "mobile", "value": "+1 214 555 1214"} ], "addresses": [ {"type": "work", "format": "us", "value": "1234 Main StnSpringfield, TX 78080-1216"}, {"type": "home", "format": "us", "value": "5678 Main StnSpringfield, TX 78080-1316"} ], "urls": [ {"type": "work", "value": "http://seankelly.biz/"}, {"type": "home", "value": "http://seankelly.tv/"} ] }去空格和换行符后保存为/home/mmicky/data/nestjson.json,使用jsonFile读入并注册成表jsonPerson,然后定义一个查询jsonQuery:
jsonFile("/home/mmicky/data/nestjson.json").registerTempTable("jsonPerson") val jsonQuery = sql("select * from jsonPerson")
jsonQuery.printSchema
jsonQuery.queryExecution
parquetFile("/home/mmicky/data/spark/wiki_parquet").registerTempTable("parquetWiki") val parquetQuery = sql("select * from parquetWiki")
parquetQuery.printSchema
parquetQuery.queryExecution
val hiveQuery = sql("select * from sales")
hiveQuery.printSchema
hiveQuery.queryExecution
sql("select state,avg(age) from people group by state").queryExecution
sql("select state,avg(age) from people group by state").toDebugString
sql("select a.name,b.name from people a join people b where a.name=b.name").queryExecution
sql("select a.name,b.name from people a join people b where a.name=b.name").toDebugString
sql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecution
sql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugString
sql("select name from (select * from people where age >=19) a where a.age <30").queryExecution
sql("select name from (select name,state as location from people) a where location='CA'").queryExecution
sql("select name,1+2 from people").queryExecution
object CombineFilters extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(c1, Filter(c2, grandChild)) => Filter(And(c1,c2),grandChild) } }
val query= sql("select * from people").where('age >=19).where('age <30) query.queryExecution.analyzed
CombineFilters(query.queryExecution.analyzed)
val hiveQuery = sql("SELECT * FROM (SELECT * FROM src) a") hiveQuery.queryExecution.analyzed
hiveQuery.queryExecution.analyzed transform { case Project(projectList, child) if projectList == child.output => child }
sparkSQL1.1入门之四:深入了解sparkSQL运行计划
原文地址:http://blog.csdn.net/book_mmicky/article/details/40370607