RDD全称是Resilient Distributed Dataset, 是spark的核心抽象层,通过它可以读取多种文件,这里演示如何读取hdfs文件。所有spark的工作都是发生在RDD上,比如创建新的RDD,转换已有的RDD,对现有的RDD计算求得结果。
RDD在spark中是不可变的(immutable)对象集合,RDD可以被划分成多个分区,存放在不同的节点。
有两种方法,一种是加载外部的数据集,比如下面加载HDFS的文件, 运行在scalar-shell中:
val textFile = sc.textFile("hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log") textFile.count() res1: Long = 3574
另一种方法是在driver program中用SparkContext的paralleize方法。这里暂时不讨论。
上面的log文件内容其实是json格式的,所以可以换种读法:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2f92b5a1 scala> val path = "hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log" path: String = hdfs://namenode-host:9000/input/dean/obd_hdfs-writer-4-9-1447126914492.log scala> val c = sqlContext.read.json(path) c: org.apache.spark.sql.DataFrame = [data: struct<client_version:bigint,corp_id:string,east:bigint,ext_o_latitude:double,ext_o_longitude:double,gps_num:array<struct<east:bigint,gps_num:bigint,gpstime:bigint,latitude:double,longitude:double,msg_id:bigint,msg_length:bigint,msg_type:bigint,north:bigint,terminal:string,tsp_obd_n900_head:array<bigint>>>,gpstime:bigint,heading:bigint,k:string,latitude:double,longitude:double,msg_id:bigint,msg_length:bigint,msg_type:bigint,north:bigint,syn_type:bigint,systime_driverStorage:bigint,systime_listenerserver:bigint,target_id:string,target_name:string,terminal:string,terminal_id:string,terminal_status_desc:string,tsp_obd_n900_head:array<bigint>,type:bigint,update_time:bigint>, driverName: string, type: string] scala> c.printSchema() root |-- data: struct (nullable = true) | |-- client_version: long (nullable = true) | |-- corp_id: string (nullable = true) | |-- east: long (nullable = true) | |-- ext_o_latitude: double (nullable = true) | |-- ext_o_longitude: double (nullable = true) | |-- gps_num: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- east: long (nullable = true) | | | |-- gps_num: long (nullable = true) | | | |-- gpstime: long (nullable = true) | | | |-- latitude: double (nullable = true) | | | |-- longitude: double (nullable = true) | | | |-- msg_id: long (nullable = true) | | | |-- msg_length: long (nullable = true) | | | |-- msg_type: long (nullable = true) | | | |-- north: long (nullable = true) | | | |-- terminal: string (nullable = true) | | | |-- tsp_obd_n900_head: array (nullable = true) | | | | |-- element: long (containsNull = true) | |-- gpstime: long (nullable = true) | |-- heading: long (nullable = true) | |-- k: string (nullable = true) | |-- latitude: double (nullable = true) | |-- longitude: double (nullable = true) | |-- msg_id: long (nullable = true) | |-- msg_length: long (nullable = true) | |-- msg_type: long (nullable = true) | |-- north: long (nullable = true) | |-- syn_type: long (nullable = true) | |-- systime_driverStorage: long (nullable = true) | |-- systime_listenerserver: long (nullable = true) | |-- target_id: string (nullable = true) | |-- target_name: string (nullable = true) | |-- terminal: string (nullable = true) | |-- terminal_id: string (nullable = true) | |-- terminal_status_desc: string (nullable = true) | |-- tsp_obd_n900_head: array (nullable = true) | | |-- element: long (containsNull = true) | |-- type: long (nullable = true) | |-- update_time: long (nullable = true) |-- driverName: string (nullable = true) |-- type: string (nullable = true)
现在来写入到临时表obd中,并遍历该表的内容:
c.registerTempTable("obd") val set = sqlContext.sql("select * from obd") set.collect().foreach(println)
会自动将JSON的树形结构拉平,不管好不好,至少是个能用的表。
这是一个程序和sql混在一起使用的模式,有点意思,不过还有些缺憾。既然是程序,就需要自动补全等功能,spark-shell没有提供。
版权声明:本文为博主原创文章,未经博主允许不得转载。
spark 通过 RDD 从HDFS文件加载JSON文件到sql表
原文地址:http://blog.csdn.net/csfreebird/article/details/49766635