码迷,mamicode.com
首页 > 数据库 > 详细

Spark SQL External Data Sources JDBC官方实现写测试

时间:2015-02-05 17:52:44      阅读:1163      评论:0      收藏:0      [点我收藏+]

标签:

通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。

jdbc.scala重要API介绍:

/**
 * Save this RDD to a JDBC database at `url` under the table name `table`.
 * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
 * If you pass `true` for `allowExisting`, it will drop any table with the
 * given name; if you pass `false`, it will throw if the table already
 * exists.
 */
def createJDBCTable(url: String, table: String, allowExisting: Boolean) 


/**
 * Save this RDD to a JDBC database at `url` under the table name `table`.
 * Assumes the table already exists and has a compatible schema.  If you
 * pass `true` for `overwrite`, it will `TRUNCATE` the table before
 * performing the `INSERT`s.
 *
 * The table must already exist on the database.  It must have a schema
 * that is compatible with the schema of this RDD; inserting the rows of
 * the RDD in order via the simple statement
 * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
 */
def insertIntoJDBC(url: String, table: String, overwrite: Boolean) 

 

import org.apache.spark.sql.SQLContext  
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val sqlContext  = new SQLContext(sc)
import sqlContext._

#数据准备
val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"

val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))
val arr1x2 = Array[Row](Row.apply("fred", 3))
val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)

val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))
val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) 

import org.apache.spark.sql.jdbc._

================================CREATE======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)

srdd.createJDBCTable(url, "person", false)
sqlContext.jdbcRDD(url, "person").collect.foreach(println)
[dave,42]
[mary,222]

==============================CREATE with overwrite========================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person2", false)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[mary,222,2]
[dave,42,1]

val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd2.createJDBCTable(url, "person2", true)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[fred,3]

================================CREATE then INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd.createJDBCTable(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]

srdd2.insertIntoJDBC(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]
[fred,3]

================================CREATE then INSERT to truncate======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)

srdd.createJDBCTable(url, "person4", false)
sqlContext.jdbcRDD(url, "person4").collect.foreach(println)
[dave,42]
[mary,222]

srdd2.insertIntoJDBC(url, "person4", true)
[fred,3]

================================Incompatible INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person5", false)
srdd2.insertIntoJDBC(url, "person5", true)
    java.sql.SQLException: Column count doesnt match value count at row 1

 

Spark SQL External Data Sources JDBC官方实现写测试

标签:

原文地址:http://www.cnblogs.com/luogankun/p/4275213.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!