码迷,mamicode.com
首页 > 数据库 > 详细

第五周周二练习:实验 5 Spark SQL 编程初级实践

时间:2019-03-26 21:13:29      阅读:310      评论:0      收藏:0      [点我收藏+]

标签:实现   from   parallel   tor   str   schema   dataframe   结果   view   

1.题目:

技术图片源码:

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
object TestMySQL {
    def main(args: Array[String]) {
     val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
   import spark.implicits._ 
        val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
        val  schema  =  StructType(List(StructField("id",  IntegerType,true),StructField("name",  StringType,  true),StructField("gender",  StringType,true),StructField("age", IntegerType, true)))
        val  rowRDD  =  employeeRDD.map(p  =>  Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
        val employeeDF = spark.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "hadoop")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee", prop)
        val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "hadoop").load()
        jdbcDF.agg("age" -> "max", "age" -> "sum").show()    
        print("ok")
    }
}

数据库数据:
技术图片

 

结果:

技术图片

 2.编程实现将 RDD  转换为 DataFrame

技术图片

 

官网给出两种方法,这里给出一种(使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。):

源码:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object RDDtoDF {
def main(args: Array[String]) {
   val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
   import spark.implicits._  
val  employeeRDD  =spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
val schemaString = "id name age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable = true))
val schema = StructType(fields)
val  rowRDD  =  employeeRDD.map(_.split(",")).map(attributes  =>
Row(attributes(0).trim, attributes(1), attributes(2).trim))
val employeeDF = spark.createDataFrame(rowRDD, schema)
employeeDF.createOrReplaceTempView("employee")
val results = spark.sql("SELECT id,name,age FROM employee")
results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
}
}

 

 结果:

技术图片

 

第五周周二练习:实验 5 Spark SQL 编程初级实践

标签:实现   from   parallel   tor   str   schema   dataframe   结果   view   

原文地址:https://www.cnblogs.com/mm20/p/10603428.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!