最近工作中把一些sql.sh脚本执行hive的语句升级为spark2.1版本,其中遇到将case when 替换为scala操作df的方式实现的问题:
代码数据:
scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import spark.implicits._ import spark.implicits._ scala> case class fpb_servercls(gridid: String, height: Int, objectid: Int, rsrp: Double, calibrategridid: Int, calibartetype: String) defined class fpb_servercls scala> | val fpb_server_test = List( | fpb_servercls("grid1", 0, 888888, -88, 53, null), | fpb_servercls("grid1", 5, 888888, -99, 53, null), | fpb_servercls("grid2", 0, 333333, -78, 53, null), | fpb_servercls("grid4", 0, 444444, -78, 53, null) | ).toDF fpb_server_test: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields] scala> val sampe_data_test = List( | fpb_servercls("grid1", 0, 888888, -78, 53, "HOMEWIFI"), | fpb_servercls("grid1", 5, 999999, -89, 53, null), | fpb_servercls("grid2", 0, 333333, -87, 53, null) | ).toDF sampe_data_test: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]
错误代码:
scala>         val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
     |           fpb_server_test("gridid") === sampe_data_test("gridid")
     |             && fpb_server_test("height") === sampe_data_test("height")
     |             && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
     |          .select(
     |             fpb_server_test("gridid"),
     |             fpb_server_test("height"),
     |             fpb_server_test("objectid"),
     |             when(sampe_data_test("gridid") === lit(null), fpb_server_test("rsrp")).otherwise(sampe_data_test("rsrp")).alias("rsrp"),
     |             fpb_server_test("calibrategridid"),
     |             when(sampe_data_test("gridid") === lit(null), fpb_server_test("calibartetype")).otherwise(sampe_data_test("calibartetype")).alias("f_calibartetype")
     |           )
temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]
scala> temp_result.show
+------+------+--------+-----+---------------+---------------+
|gridid|height|objectid| rsrp|calibrategridid|f_calibartetype|
+------+------+--------+-----+---------------+---------------+
| grid1|     0|  888888|-78.0|             53|       HOMEWIFI|
| grid1|     5|  888888| null|             53|           null|
| grid2|     0|  333333|-87.0|             53|           null|
| grid4|     0|  444444| null|             53|           null|
+------+------+--------+-----+---------------+---------------+
错误的愿意就是这里的判定是否为空的地方。
正确用法:
scala>  val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
     |       fpb_server_test("gridid") === sampe_data_test("gridid")
     |         && fpb_server_test("height") === sampe_data_test("height")
     |         && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
     |    .select(
     |       fpb_server_test("gridid"),
     |       fpb_server_test("height"),
     |       fpb_server_test("objectid"),
     |       when(sampe_data_test("gridid").isNull, fpb_server_test("rsrp")).otherwise(sampe_data_test("rsrp")).alias("rsrp"),
     |       fpb_server_test("calibrategridid"),
     |       when(sampe_data_test("gridid").isNull, fpb_server_test("calibartetype")).otherwise(sampe_data_test("calibartetype")).alias("f_calibartetype")
     |     )
temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]
scala> temp_result.show
+------+------+--------+-----+---------------+---------------+
|gridid|height|objectid| rsrp|calibrategridid|f_calibartetype|
+------+------+--------+-----+---------------+---------------+
| grid1|     0|  888888|-78.0|             53|       HOMEWIFI|
| grid1|     5|  888888|-99.0|             53|           null|
| grid2|     0|  333333|-87.0|             53|           null|
| grid4|     0|  444444|-78.0|             53|           null|
+------+------+--------+-----+---------------+---------------+
疑问代码,如下代码在spark-shell中执行没有问题,但是使用spark-submit提交脚本后就提示错误:
scala>   val temp_result = fpb_server_test.alias("fpb").join(sampe_data_test.alias("sample"),
     |       fpb_server_test("gridid") === sampe_data_test("gridid")
     |         && fpb_server_test("height") === sampe_data_test("height")
     |         && fpb_server_test("objectid") === sampe_data_test("objectid"), "left_outer")
     |       .selectExpr("fpb.gridid", "fpb.height", "fpb.objectid",
     |         "(case when sample.gridid is null then fpb.rsrp else sample.rsrp end) as rsrp",
     |         "fpb.calibrategridid",
     |         "(case when sample.gridid is null then fpb.calibartetype else sample.calibartetype end) as calibartetype")
temp_result: org.apache.spark.sql.DataFrame = [gridid: string, height: int ... 4 more fields]
scala> temp_result.show
+------+------+--------+-----+---------------+-------------+
|gridid|height|objectid| rsrp|calibrategridid|calibartetype|
+------+------+--------+-----+---------------+-------------+
| grid1|     0|  888888|-78.0|             53|     HOMEWIFI|
| grid1|     5|  888888|-99.0|             53|         null|
| grid2|     0|  333333|-87.0|             53|         null|
| grid4|     0|  444444|-78.0|             53|         null|
+------+------+--------+-----+---------------+-------------+