def CommonCompareWriteToOracle(hiveDF: DataFrame, bizDate: String, targetTable: String, srcId: String, spark: SparkSession): Unit = {
val queryOldData = "(SELECT * FROM " + targetTable + " WHERE VALUATION_DATE = TO_DATE(‘" + bizDate + "‘,‘YYYY-MM-DD‘) AND SRC_ID = ‘" + srcId + "‘)"
logInfo("queryOldData="+queryOldData)
val queryJoinKey = "(SELECT * FROM table_join_key WHERE TABLE_NAME = ‘" + targetTable.toUpperCase() + "‘)"
logInfo("queryJoinKey="+queryJoinKey)
val jdbcDF = spark.read
.format("jdbc")
.option("url", OracleProperties.jdbcUrl)
.option("dbtable", queryOldData)
.option("user", OracleProperties.user)
.option("password", OracleProperties.password)
.load()
val joinKeyDF = spark.read
.format("jdbc")
.option("url", OracleProperties.jdbcUrl)
.option("dbtable", queryJoinKey)
.option("user", OracleProperties.user)
.option("password", OracleProperties.password)
.load()
logInfo("jdbc sparkDF = " + jdbcDF.show())
logInfo("jdbc dtypes = " + jdbcDF.dtypes)
logInfo("joinKeyDF = " + joinKeyDF.show())
val tmp = joinKeyDF.select("PKEY").collect().map(_(0).toString())
val joinKeyList = new Array[org.apache.spark.sql.Column](tmp.length)
for (i<- 0 until tmp.length)
joinKeyList(i)=hiveDF(tmp(i))===jdbcDF(tmp(i))
logInfo("joinKeyList = " + joinKeyList)
logInfo("joinKeyList = " + joinKeyList)
val rv = hiveDF.join(jdbcDF,joinKeyList.reduce(_ and _),"outer")
logInfo("rv.count = " + rv.count)
val comparekey = "VALUATION_DATE"
val cols = hiveDF.columns
val a=hiveDF.dtypes
logInfo("hiveDF.dtypes= "+ hiveDF.dtypes)
val b = new ArrayBuffer[String]()
for (i <- a if i._2=="IntegerType") b+=i._1
val numArray = b.toArray
logInfo("numArray = " + numArray)
val num_col = new Array[org.apache.spark.sql.Column](numArray.length)
for (i <- 0 until numArray.length) num_col(i)=when(hiveDF(comparekey).isNull, lit(0)).otherwise(hiveDF(numArray(i))).as(numArray(i))
logInfo("num_col = " + num_col)
val strArray = cols.filterNot(numArray.contains(_))
val str_col = new Array[org.apache.spark.sql.Column](strArray.length)
for (i <- 0 until strArray.length) str_col(i)=when(hiveDF(comparekey).isNull, jdbcDF(strArray(i))).otherwise(hiveDF(strArray(i))).as(strArray(i))
logInfo("strArray = " + strArray)
logInfo("str_col = " + str_col)
val mergeDF = rv.select((num_col ++ str_col):_*)
MergeDataWriteToOracle(mergeDF, targetTable, srcId, bizDate)
}