标签:
1. 文本导入
创建RDD的形式,测试txt文本
master=spark://master:7077
./bin/spark-shell
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2013f094 scala> import sqlContext.createSchemaRDD import sqlContext.createSchemaRDD scala> case class Person(name: String, age: Int) defined class Person scala> val people = sc.textFile("/user/p.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) 15/05/05 06:30:35 INFO storage.MemoryStore: ensureFreeSpace(190122) called with curMem=0, maxMem=278302556 15/05/05 06:30:35 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 185.7 KB, free 265.2 MB) 15/05/05 06:30:35 INFO storage.MemoryStore: ensureFreeSpace(29581) called with curMem=190122, maxMem=278302556 15/05/05 06:30:35 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.9 KB, free 265.2 MB) 15/05/05 06:30:35 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:59627 (size: 28.9 KB, free: 265.4 MB) 15/05/05 06:30:35 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/05 06:30:35 INFO spark.DefaultExecutionContext: Created broadcast 0 from textFile at <console>:17 people: org.apache.spark.rdd.RDD[Person] = MappedRDD[3] at map at <console>:17 scala> people.registerTempTable("people") scala> val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 3 AN D age <= 19") teenagers: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Project [name#0] Filter ((age#1 >= 3) && (age#1 <= 19)) PhysicalRDD [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36 scala> teenagers.map(t => "Name: " + t(0)).collect().foreach(println) 15/05/05 06:31:18 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/05/05 06:31:18 INFO mapred.FileInputFormat: Total input paths to process : 1 15/05/05 06:31:18 INFO spark.DefaultExecutionContext: Starting job: collect at <console>:18 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:18) with 2 output partitions (allowLocal=false) 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at <console>:18) 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Missing parents: List() 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[7] at map at <console>:18), which has no missing parents 15/05/05 06:31:18 INFO storage.MemoryStore: ensureFreeSpace(6400) called with curMem=219703, maxMem=278302556 15/05/05 06:31:18 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.3 KB, free 265.2 MB) 15/05/05 06:31:18 INFO storage.MemoryStore: ensureFreeSpace(4278) called with curMem=226103, maxMem=278302556 15/05/05 06:31:18 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.2 KB, free 265.2 MB) 15/05/05 06:31:18 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:59627 (size: 4.2 KB, free: 265.4 MB) 15/05/05 06:31:18 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/05 06:31:18 INFO spark.DefaultExecutionContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/05/05 06:31:18 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[7] at map at <console>:18) 15/05/05 06:31:18 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/05/05 06:31:18 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1293 bytes) 15/05/05 06:31:18 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1293 bytes) 15/05/05 06:31:18 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1) 15/05/05 06:31:18 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 15/05/05 06:31:18 INFO rdd.HadoopRDD: Input split: hdfs://master:8020/user/p.txt:15+15 15/05/05 06:31:18 INFO rdd.HadoopRDD: Input split: hdfs://master:8020/user/p.txt:0+15 15/05/05 06:31:19 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1755 bytes result sent to driver 15/05/05 06:31:19 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1733 bytes result sent to driver 15/05/05 06:31:19 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 750 ms on localhost (1/2) 15/05/05 06:31:19 INFO scheduler.DAGScheduler: Stage 0 (collect at <console>:18) finished in 0.782 s 15/05/05 06:31:19 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 772 ms on localhost (2/2) 15/05/05 06:31:19 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/05/05 06:31:19 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:18, took 0.860763 s Name: kang Name: wu Name: liu Name: zhang
Spark SQL支持的导入json格式,留着以后测试使用,参考这里
---------------------------------------------华丽分割线-----------------------------------------------------------------------------------
继续调研测试Spark SQL对关系型数据库的支持
数据源API通过Spark SQL提供了访问结构化数据的可插拔机制。数据源不仅仅有了简便的途径去进行数据转换并加入到Spark 平台。
使用数据源和通过SQL访问他们一样简单(或者你喜爱的Spark语言)
CREATE TEMPORARY TABLE episodes USING com.databricks.spark.avro OPTIONS (path "episodes.avro")
数据源API的另外一个优点就是不管数据的来源如何,用户都能够通过Spark支持的所有语言来操作这些数据 。例如,那些用Scala实现的数据源,pySpark用户不需要其他的库开发者做任何额外的工作就可以使用。此外,Spark SQL可以很容易的使用单一接口访问不同数据源的数据。
Spark SQL1.3
标签:
原文地址:http://www.cnblogs.com/kxdblog/p/4480454.html