码迷,mamicode.com
首页 > 数据库 > 详细

spark读取mysql

时间:2020-04-22 10:13:02      阅读:86      评论:0      收藏:0      [点我收藏+]

标签:code   tar   nap   ndt   com   each   poc   pass   als   

import java.sql.DriverManager
import java.time.{LocalDateTime, ZoneOffset}

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

// spark-submit --master local[*] --jars /root/sparkjob/mysql-connector-java-5.1.38.jar --class com.zxb.sparkapplication.readwrite.SparkReadMysql /root/sparkjob/original-scalatest-1.0-SNAPSHOT.jar

/**
  * spark读取mysql数据
  */
object SparkReadMysql {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("spark write mysql")

    val sc = new SparkContext(conf)

    // 连接mysql相关配置信息
    val driverClassName = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://ip:3306/xunwu?characterEncoding=utf8&useSSL=false"
    val user = "root"
    val password = "123456"

    //mysql里时间类型为datetime,传入的条件为时间戳
    val sql = "select id,title,price,area from house where create_time > from_unixtime(?) and create_time < from_unixtime(?)"

    val connection = () => {
      Class.forName(driverClassName)
      DriverManager.getConnection(url, user, password)
    }

    val startTime = LocalDateTime.of(2017, 1, 3, 0, 0, 0)
    val endTime = LocalDateTime.of(2019, 11, 4, 0, 0)

    //mysql的时间戳只有10位,需要把java里的13位时间戳降低精度,直接除以1000
    val startTimeStamp = startTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000
    val endTimeStamp = endTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000

    println("startTime: " + startTime + ", endTime: " + endTime)
    println("startTime: " + startTimeStamp + ", endTime: " + endTimeStamp)

    //读取
    val result: JdbcRDD[(Int, String, Int, Int)] = new JdbcRDD[(Int, String, Int, Int)](
      sc,
      connection,
      sql,
      startTimeStamp,
      endTimeStamp,
      1,
      rs => {

        val id = rs.getInt(1)
        val title = rs.getString(2)
        val price = rs.getInt(3)
        val area = rs.getInt(4)
        (id,title,price,area)
      }
    )
    result.collect().foreach(println)
    sc.stop()
  }

}

 

spark读取mysql

标签:code   tar   nap   ndt   com   each   poc   pass   als   

原文地址:https://www.cnblogs.com/zxbdboke/p/12749537.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!