码迷,mamicode.com
首页 > 其他好文 > 详细

111

时间:2018-02-03 18:59:58      阅读:163      评论:0      收藏:0      [点我收藏+]

标签:reduce   read   orm   length   oracl   tmp   tar   curl   word   

def CommonCompareWriteToOracle(hiveDF: DataFrame, bizDate: String, targetTable: String, srcId: String, spark: SparkSession): Unit = {
val queryOldData = "(SELECT * FROM " + targetTable + " WHERE VALUATION_DATE = TO_DATE(‘" + bizDate + "‘,‘YYYY-MM-DD‘) AND SRC_ID = ‘" + srcId + "‘)"
logInfo("queryOldData="+queryOldData)
val queryJoinKey = "(SELECT * FROM table_join_key WHERE TABLE_NAME = ‘" + targetTable.toUpperCase() + "‘)"
logInfo("queryJoinKey="+queryJoinKey)
val jdbcDF = spark.read
.format("jdbc")
.option("url", OracleProperties.jdbcUrl)
.option("dbtable", queryOldData)
.option("user", OracleProperties.user)
.option("password", OracleProperties.password)
.load()

val joinKeyDF = spark.read
.format("jdbc")
.option("url", OracleProperties.jdbcUrl)
.option("dbtable", queryJoinKey)
.option("user", OracleProperties.user)
.option("password", OracleProperties.password)
.load()

logInfo("jdbc sparkDF = " + jdbcDF.show())
logInfo("jdbc dtypes = " + jdbcDF.dtypes)
logInfo("joinKeyDF = " + joinKeyDF.show())

val tmp = joinKeyDF.select("PKEY").collect().map(_(0).toString())
val joinKeyList = new Array[org.apache.spark.sql.Column](tmp.length)
for (i<- 0 until tmp.length)
joinKeyList(i)=hiveDF(tmp(i))===jdbcDF(tmp(i))
logInfo("joinKeyList = " + joinKeyList)
logInfo("joinKeyList = " + joinKeyList)
val rv = hiveDF.join(jdbcDF,joinKeyList.reduce(_ and _),"outer")
logInfo("rv.count = " + rv.count)

val comparekey = "VALUATION_DATE"
val cols = hiveDF.columns
val a=hiveDF.dtypes
logInfo("hiveDF.dtypes= "+ hiveDF.dtypes)
val b = new ArrayBuffer[String]()
for (i <- a if i._2=="IntegerType") b+=i._1


val numArray = b.toArray
logInfo("numArray = " + numArray)
val num_col = new Array[org.apache.spark.sql.Column](numArray.length)
for (i <- 0 until numArray.length) num_col(i)=when(hiveDF(comparekey).isNull, lit(0)).otherwise(hiveDF(numArray(i))).as(numArray(i))
logInfo("num_col = " + num_col)
val strArray = cols.filterNot(numArray.contains(_))
val str_col = new Array[org.apache.spark.sql.Column](strArray.length)
for (i <- 0 until strArray.length) str_col(i)=when(hiveDF(comparekey).isNull, jdbcDF(strArray(i))).otherwise(hiveDF(strArray(i))).as(strArray(i))
logInfo("strArray = " + strArray)
logInfo("str_col = " + str_col)

val mergeDF = rv.select((num_col ++ str_col):_*)

MergeDataWriteToOracle(mergeDF, targetTable, srcId, bizDate)


}

111

标签:reduce   read   orm   length   oracl   tmp   tar   curl   word   

原文地址:https://www.cnblogs.com/qiuhong10/p/8410030.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!