标签:
SparkSQL操作文本文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class PageViews(track_time: String, url: String, session_id: String,referer: String, ip: String,end_user_id: String, city_id:String) val page_views = sc.textFile("hdfs://hadoop000:8020/sparksql/page_views.dat").map(_.split("\t")).map(p => PageViews(p(0), p(1), p(2), p(3), p(4), p(5), p(6))) page_views.registerTempTable("page_views") val sql1 = sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10") sql1.collect() val sql2 = sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10") sql2.collect()
SparkSQL操作Parquet文件
SparkSQL支持读取Parquet中的数据、支持写到Parquet中时保存元数据的schema信息;列式存储避免读出不需要的数据,提高查询效率,减少GC;
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Person(name: String, age: Int) val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) people.saveAsParquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //存 val parquetFile = sqlContext.parquetFile("hdfs://hadoop000:8020/sparksql/resources/people.parquet") //读 parquetFile.registerAsTable("parquetFile") val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect
SparkSQL操作json文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val path = "hdfs://hadoop000:8020/sparksql/resources/people.json" val people = sqlContext.jsonFile(path) import sqlContext._ people.printSchema() people.registerTempTable("people") val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") teenagers.collect val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) anotherPeople.collect
SparkSQL操作DSL
使用DSL我们可以直接基于读取的RDD数据进行SQL操作,无需注册成Table,用Scala的symbols代表table中的每一列;
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class Person(name: String, age: Int) val people = sc.textFile("hdfs://hadoop000:8020/sparksql/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) val teenagers = people.where(‘age >= 10).where(‘age <= 19).select(‘name) teenagers.toDebugString teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
SparkSQL操作已有的hive表
spark-shell方式访问:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ sql("SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10").collect().foreach(println) sql("SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10").collect().foreach(println)
spark-sql方式访问:
需要将hive-site.xml拷贝到$SPARK_HOME/conf下
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10; SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;
hive-thriftserver方式访问:
1)启动hive-thriftserver:
cd $SPARK_HOME/sbin start-thriftserver.sh
指定端口方式启动:start-thriftserver.sh --hiveconf hive.server2.thrift.port=14000
2)启动beeline客户端:
cd $SPARK_HOME/bin beeline -u jdbc:hive2://hadoop000:10000/default -n spark
SELECT track_time, url, session_id, referer, ip, end_user_id, city_id FROM page_views WHERE city_id = -1000 limit 10; SELECT session_id, count(*) c FROM page_views group by session_id order by c desc limit 10;
SparkSQL缓存表
在Spark1.2版本之后注意事项:
1)使用SchemaRDD.cache或者SQLContext.cacheTable,都采用列式存储的方式缓存到内存中;
2)SQLContext.cacheTable/uncacheTable都是eager的,而不再是lazy;不再需要手工触发action后才进行缓存;
3)可以通过CACHE [LAZY] TABLE tb1 [AS SELECT ...] 手工设置LAZY或者EAGER;
cacheTable后注意观察WEBUI界面Stroage的变化
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ sql("cache table page_views") sql("select session_id, count(session_id) as c from page_views group by session_id order by c desc limit 10").collect().foreach(println) sql("uncache table page_views")
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ sql("CACHE TABLE page_views_cached_eager AS SELECT * FROM page_views") sql("select session_id, count(session_id) as c from page_views_cached_eager group by session_id order by c desc limit 10").collect().foreach(println) uncacheTable("page_views_cached")
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ sql("CACHE LAZY TABLE page_views_cached_lazy AS SELECT * FROM page_views") sql("select count(*) as c from page_views_cached_lazy").collect().foreach(println) sql("select session_id, count(session_id) as c from page_views_cached_lazy group by session_id order by c desc limit 10").collect().foreach(println) uncacheTable("page_views_cached")
标签:
原文地址:http://www.cnblogs.com/luogankun/p/4203294.html