码迷,mamicode.com
首页 > 数据库 > 详细

spark 通过 RDD 从HDFS文件加载JSON文件到sql表

时间:2015-11-11 16:38:48      阅读:433      评论:0      收藏:0      [点我收藏+]

标签:spark   hdfs   

RDD定义

RDD全称是Resilient Distributed Dataset, 是spark的核心抽象层,通过它可以读取多种文件,这里演示如何读取hdfs文件。所有spark的工作都是发生在RDD上,比如创建新的RDD,转换已有的RDD,对现有的RDD计算求得结果。

RDD在spark中是不可变的(immutable)对象集合,RDD可以被划分成多个分区,存放在不同的节点。


创建RDD

有两种方法,一种是加载外部的数据集,比如下面加载HDFS的文件, 运行在scalar-shell中:

val textFile = sc.textFile("hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log")
textFile.count()
res1: Long = 3574 

另一种方法是在driver program中用SparkContext的paralleize方法。这里暂时不讨论。

读取JSON文件

上面的log文件内容其实是json格式的,所以可以换种读法:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2f92b5a1

scala> val path = "hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log"
path: String = hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log

scala> val c = sqlContext.read.json(path)
c: org.apache.spark.sql.DataFrame = [data: struct<client_version:bigint,corp_id:string,east:bigint,ext_o_latitude:double,ext_o_longitude:double,gps_num:array<struct<east:bigint,gps_num:bigint,gpstime:bigint,latitude:double,longitude:double,msg_id:bigint,msg_length:bigint,msg_type:bigint,north:bigint,terminal:string,tsp_obd_n900_head:array<bigint>>>,gpstime:bigint,heading:bigint,k:string,latitude:double,longitude:double,msg_id:bigint,msg_length:bigint,msg_type:bigint,north:bigint,syn_type:bigint,systime_driverStorage:bigint,systime_listenerserver:bigint,target_id:string,target_name:string,terminal:string,terminal_id:string,terminal_status_desc:string,tsp_obd_n900_head:array<bigint>,type:bigint,update_time:bigint>, driverName: string, type: string]

scala> c.printSchema()
root
 |-- data: struct (nullable = true)
 |    |-- client_version: long (nullable = true)
 |    |-- corp_id: string (nullable = true)
 |    |-- east: long (nullable = true)
 |    |-- ext_o_latitude: double (nullable = true)
 |    |-- ext_o_longitude: double (nullable = true)
 |    |-- gps_num: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- east: long (nullable = true)
 |    |    |    |-- gps_num: long (nullable = true)
 |    |    |    |-- gpstime: long (nullable = true)
 |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |-- longitude: double (nullable = true)
 |    |    |    |-- msg_id: long (nullable = true)
 |    |    |    |-- msg_length: long (nullable = true)
 |    |    |    |-- msg_type: long (nullable = true)
 |    |    |    |-- north: long (nullable = true)
 |    |    |    |-- terminal: string (nullable = true)
 |    |    |    |-- tsp_obd_n900_head: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |-- gpstime: long (nullable = true)
 |    |-- heading: long (nullable = true)
 |    |-- k: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- msg_id: long (nullable = true)
 |    |-- msg_length: long (nullable = true)
 |    |-- msg_type: long (nullable = true)
 |    |-- north: long (nullable = true)
 |    |-- syn_type: long (nullable = true)
 |    |-- systime_driverStorage: long (nullable = true)
 |    |-- systime_listenerserver: long (nullable = true)
 |    |-- target_id: string (nullable = true)
 |    |-- target_name: string (nullable = true)
 |    |-- terminal: string (nullable = true)
 |    |-- terminal_id: string (nullable = true)
 |    |-- terminal_status_desc: string (nullable = true)
 |    |-- tsp_obd_n900_head: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- type: long (nullable = true)
 |    |-- update_time: long (nullable = true)
 |-- driverName: string (nullable = true)
 |-- type: string (nullable = true)


转换成表格


现在来写入到临时表obd中,并遍历该表的内容:

c.registerTempTable("obd")
val set = sqlContext.sql("select * from obd")
set.collect().foreach(println)

会自动将JSON的树形结构拉平,不管好不好,至少是个能用的表。


这是一个程序和sql混在一起使用的模式,有点意思,不过还有些缺憾。既然是程序,就需要自动补全等功能,spark-shell没有提供。








版权声明:本文为博主原创文章,未经博主允许不得转载。

spark 通过 RDD 从HDFS文件加载JSON文件到sql表

标签:spark   hdfs   

原文地址:http://blog.csdn.net/csfreebird/article/details/49766635

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!