标签:price end spark sql join apach === frame 不等值连接
products一个商品价格变化的表,orders商品订单,记录每次购买商品和日期缓慢变化的商品价格表
旺仔牛奶,发生过一次价格变更
scala> val products = sc.parallelize(Array(
| ("旺仔牛奶", "2017-01-01", "2018-01-01", 4),
| ("旺仔牛奶", "2018-01-02", "2020-01-01", 5),
| ("王老吉", "2017-01-02", "2019-01-01", 5),
| ("卫龙辣条", "2010-01-01", "2020-01-01", 2)
| )).toDF("name", "startDate", "endDate", "price")
products: org.apache.spark.sql.DataFrame = [name: string, startDate: string ... 2 more fields]
scala> products.show();
+----+----------+----------+-----+
|name| startDate| endDate|price|
+----+----------+----------+-----+
|旺仔牛奶|2017-01-01|2018-01-01| 4|
|旺仔牛奶|2018-01-02|2020-01-01| 5|
| 王老吉|2017-01-02|2019-01-01| 5|
|卫龙辣条|2010-01-01|2020-01-01| 2|
+----+----------+----------+-----+
订单表(商品名称,订单日期)
旺仔牛奶在不同价格时段分别发生了一次订单
scala> val orders = sc.parallelize(Array(
| ("2017-06-01", "旺仔牛奶"),
| ("2017-07-01", "王老吉"),
| ("2018-03-01", "旺仔牛奶")
| )).toDF("date", "product")
orders: org.apache.spark.sql.DataFrame = [date: string, product: string]
scala> orders.show
+----------+-------+
| date|product|
+----------+-------+
|2017-06-01|旺仔牛奶|
|2017-07-01| 王老吉|
|2018-03-01|旺仔牛奶|
+----------+-------+
通过不等值连接,计算每个订单当时的商品价格
查看出旺仔牛奶,两个订单在不同时间段上对应的价格
scala> orders.join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate").show()
+-----------+------------+----------+------------+-------------+-----+
| date | product | name | startDate | endDate | price|
+-----------+------------+----------+------------+-------------+-----+
|2017-07-01| 王老吉 | 王老吉 |2017-01-02|2019-01-01 | 5 |
|2017-06-01| 旺仔牛奶 |旺仔牛奶|2017-01-01|2018-01-01 | 4 |
|2018-03-01| 旺仔牛奶 |旺仔牛奶|2018-01-02|2020-01-01 | 5 |
+-----------+------------+----------+------------+-------------+-----+
标签:price end spark sql join apach === frame 不等值连接
原文地址:http://blog.51cto.com/10120275/2171066