标签:
通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。
jdbc.scala重要API介绍:
/** * Save this RDD to a JDBC database at `url` under the table name `table`. * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. * If you pass `true` for `allowExisting`, it will drop any table with the * given name; if you pass `false`, it will throw if the table already * exists. */ def createJDBCTable(url: String, table: String, allowExisting: Boolean) /** * Save this RDD to a JDBC database at `url` under the table name `table`. * Assumes the table already exists and has a compatible schema. If you * pass `true` for `overwrite`, it will `TRUNCATE` the table before * performing the `INSERT`s. * * The table must already exist on the database. It must have a schema * that is compatible with the schema of this RDD; inserting the rows of * the RDD in order via the simple statement * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. */ def insertIntoJDBC(url: String, table: String, overwrite: Boolean)
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types._ val sqlContext = new SQLContext(sc) import sqlContext._ #数据准备 val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root" val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222)) val arr1x2 = Array[Row](Row.apply("fred", 3)) val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil) val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2)) val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) import org.apache.spark.sql.jdbc._ ================================CREATE====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) srdd.createJDBCTable(url, "person", false) sqlContext.jdbcRDD(url, "person").collect.foreach(println) [dave,42] [mary,222] ==============================CREATE with overwrite======================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3) srdd.createJDBCTable(url, "person2", false) sqlContext.jdbcRDD(url, "person2").collect.foreach(println) [mary,222,2] [dave,42,1] val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2) srdd2.createJDBCTable(url, "person2", true) sqlContext.jdbcRDD(url, "person2").collect.foreach(println) [fred,3] ================================CREATE then INSERT to append====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2) srdd.createJDBCTable(url, "person3", false) sqlContext.jdbcRDD(url, "person3").collect.foreach(println) [mary,222] [dave,42] srdd2.insertIntoJDBC(url, "person3", false) sqlContext.jdbcRDD(url, "person3").collect.foreach(println) [mary,222] [dave,42] [fred,3] ================================CREATE then INSERT to truncate====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2) srdd.createJDBCTable(url, "person4", false) sqlContext.jdbcRDD(url, "person4").collect.foreach(println) [dave,42] [mary,222] srdd2.insertIntoJDBC(url, "person4", true) [fred,3] ================================Incompatible INSERT to append====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3) srdd.createJDBCTable(url, "person5", false) srdd2.insertIntoJDBC(url, "person5", true) java.sql.SQLException: Column count doesn‘t match value count at row 1
Spark SQL External Data Sources JDBC官方实现写测试
标签:
原文地址:http://www.cnblogs.com/luogankun/p/4275213.html