标签:red pre contex set dstat int ati prepare driver
val data2Mysql2 = (iterator: Iterator[(String, Int)]) => { var conn: Connection = null; var ps: PreparedStatement = null val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)" try { conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root") //整个分区的数据用了一个conn iterator.foreach(line =>{ ps = conn.prepareStatement(sql) ps.setString(1,line._1) ps.setInt(2,line._2) ps.setDate(3,new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() }
rddres2.foreachPartition(data2MySQL)
def mysql2Spark(){ val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]") val sc = new SparkContext(conf) val connection = () => { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root") } val jdbcRDD = new JdbcRDD( sc, connection, //location_info(location,counts "SELECT id, location FROM location_info where id >= ? AND id <= ?", 1, 4, 2, r => { val id = r.getInt(1) val code = r.getString(2) (id, code) } ) val jrdd = jdbcRDD.collect() println(jdbcRDD.collect().toBuffer) sc.stop() }
标签:red pre contex set dstat int ati prepare driver
原文地址:http://www.cnblogs.com/rocky-AGE-24/p/7297781.html