标签:code tar nap ndt com each poc pass als
import java.sql.DriverManager import java.time.{LocalDateTime, ZoneOffset} import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} // spark-submit --master local[*] --jars /root/sparkjob/mysql-connector-java-5.1.38.jar --class com.zxb.sparkapplication.readwrite.SparkReadMysql /root/sparkjob/original-scalatest-1.0-SNAPSHOT.jar /** * spark读取mysql数据 */ object SparkReadMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark write mysql") val sc = new SparkContext(conf) // 连接mysql相关配置信息 val driverClassName = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://ip:3306/xunwu?characterEncoding=utf8&useSSL=false" val user = "root" val password = "123456" //mysql里时间类型为datetime,传入的条件为时间戳 val sql = "select id,title,price,area from house where create_time > from_unixtime(?) and create_time < from_unixtime(?)" val connection = () => { Class.forName(driverClassName) DriverManager.getConnection(url, user, password) } val startTime = LocalDateTime.of(2017, 1, 3, 0, 0, 0) val endTime = LocalDateTime.of(2019, 11, 4, 0, 0) //mysql的时间戳只有10位,需要把java里的13位时间戳降低精度,直接除以1000 val startTimeStamp = startTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000 val endTimeStamp = endTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000 println("startTime: " + startTime + ", endTime: " + endTime) println("startTime: " + startTimeStamp + ", endTime: " + endTimeStamp) //读取 val result: JdbcRDD[(Int, String, Int, Int)] = new JdbcRDD[(Int, String, Int, Int)]( sc, connection, sql, startTimeStamp, endTimeStamp, 1, rs => { val id = rs.getInt(1) val title = rs.getString(2) val price = rs.getInt(3) val area = rs.getInt(4) (id,title,price,area) } ) result.collect().foreach(println) sc.stop() } }
标签:code tar nap ndt com each poc pass als
原文地址:https://www.cnblogs.com/zxbdboke/p/12749537.html