码迷,mamicode.com
首页 > 数据库 > 详细

JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例

时间:2019-11-06 10:23:44      阅读:277      评论:0      收藏:0      [点我收藏+]

标签:har   lse   typename   编写   nec   scala   etc   sci   long   

1.编写给ResultSet添加spark的schema成员及DF(DataFrame)成员

/*
    spark、sc对象因为是全局的,没有导入,需自行定义
    teradata的字段类型转换成spark的数据类型
*/

import java.sql.{ResultSet, ResultSetMetaData}

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}

object addDataframeMember {

  trait ResultSetMetaDataToSchema {
    def columnCount: Int

    def schema: StructType
  }

  implicit def wrapResultSetMetaData(rsmd: ResultSetMetaData) = {
    new ResultSetMetaDataToSchema {
      def columnCount = rsmd.getColumnCount

      def schema = {
        def tdCovert(tdDpeStr: String, precision: Int = 0, scale: Int = 0, className: String = ""): DataType = {
          tdDpeStr match {
            case "BYTEINT" => IntegerType
            case "SMALLINT" => ShortType
            case "INTEGER" => IntegerType
            case "BIGINT" => LongType
            case "FLOAT" => DoubleType
            case "CHAR" => CharType(precision)
            case "DECIMAL" => DecimalType(precision, scale)
            case "VARCHAR" => StringType
            case "BYTE" => ByteType
            case "VARBYTE" => ByteType
            case "DATE" => DateType
            case "TIME" => TimestampType
            case "TIMESTAMP" => TimestampType
            case "CLOB" => StringType
            case "BLOB" => BinaryType
            case "Structured UDT" => ObjectType(Class.forName(className))
          }
        }

        def col2StructField(rsmd: ResultSetMetaData, i: Int): StructField = StructField(rsmd.getColumnName(i), tdCovert(rsmd.getColumnTypeName(i), rsmd.getPrecision(i), rsmd.getScale(i), rsmd.getColumnClassName(i)), rsmd.isNullable(i) match { case 1 => true case 0 => false }).withComment(rsmd.getColumnLabel(i))

        def rsmd2Schema(rsmd: ResultSetMetaData): StructType = (1 to columnCount).map(col2StructField(rsmd, _)).foldLeft(new StructType)((s: StructType, i: StructField) => s.add(i))

        rsmd2Schema(rsmd)
      }
    }
  }

  trait ResultSetToDF {
    def schema: StructType

    def DF: DataFrame
  }

  implicit def wrapResultSet(rs: ResultSet) = {
    def rsmd = rs.getMetaData

    def toList[T](retrieve: ResultSet => T): List[T] = Iterator.continually((rs.next(), rs)).takeWhile(_._1).map(r => r._2).map(retrieve).toList

    def rsContent2Row(rs: ResultSet): Row = Row.fromSeq(Array.tabulate[Object](rsmd.columnCount)(i => rs.getObject(i + 1)).toSeq)

    new ResultSetToDF {
      def schema = rsmd.schema

      def DF = spark.createDataFrame(sc.parallelize(toList(rsContent2Row)), schema)
    }

  }


}

  

2.正常基于JDBC连接并且获得数据集游标

import java.sql.{Connection, DriverManager}

/*
    获取TeraData的连接
*/

val (dialect, host, user, passwd, database, charset) = ("teradata", "ip", "user", "password", "database", "ASCII")
val tdConf = collection.immutable.Map(
  "driver" -> "com.ncr.teradata.TeraDriver",
  "uri" -> s"jdbc:$dialect://$host/CLIENT_CHARSET=EUC_CN,TMODE=TERA,COLUMN_NAME=ON,CHARSET=ASCII,database=$database",
  "username" -> user,
  "password" -> passwd
)

def getTeraConn: Connection = {
  Class.forName(tdConf("driver"))
  DriverManager.getConnection(tdConf("uri"), tdConf("username"), tdConf("password"))
}
val sql = "SELECT TOP 10 * FROM xxx"
var conn = getTeraConn
val stmt = conn.createStatement()
val rs = stmt.executeQuery(sql)

 

3.导入隐式转换,调用成员

import addDataframeMember.wrapResultSet
rs.DF.show()

  

JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例

标签:har   lse   typename   编写   nec   scala   etc   sci   long   

原文地址:https://www.cnblogs.com/shld/p/11803503.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!