标签:
/**
* @author DT大数据梦工厂
* 新浪微博 http://weibo.com/ilovepains/
* 微信公众账号:DT_Spark
* 直播地址 YY频道:68917580
*/
object SparkSQLWindowFunctionOps {
def main(args: Array[String]) {
/**
* 创建SparkConf对象,设置Spark程序运行时的配置信息
* 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置为local,则代表
* Spark程序运行在本地,适合机器配置一般的初学者
*/
val conf = new SparkConf().setAppName("SparkSQLWindowFunctionOps").setMaster("spark://hadoop2001:7077")
/**
* 创建SparkContext对象
* SparkContext对象时Spark程序所有功能的唯一入口,无论是scala、java、python等都必须有一个SparkContext。
* SparkContext的核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
* 同事还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive")
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT)"
+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘ ‘ LINES TERMINATED BY ‘\\n‘")
hiveContext.sql("LOAD DATA LOCAL INPATH ‘/root/test/testdate/topNGroup.txt‘ INTO TABLE scores")
/**
* 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序:
* PARTITION BY:指定窗口函数分组的Key
* ORDER BY : 分组进行排序
*/
val result = hiveContext.sql("SELECT name,score " +
"FROM (" +
"SELECT name,score," +
"row_number() OVER (PARTITION BY name ORDER BY score DESC) rank" +
" FROM scores " +
") sub_scores " +
"WHERE rank <= 4")
result.show() //在Driver的控制台上打印出结果内容
hiveContext.sql("DROP TABLE IF EXISTS sortedResultScores")
result.saveAsTable("sortedResultScores")
}
}
DT大数据梦工厂由王家林老师及其团队打造,旨在为社会培养100万优秀大数据人才,Spark已是目前大数据行业主流数据处理框架和未来趋势。
关注DT大数据梦工厂公众号:
DT_Spark
查看免费公开课,内容绝对详细。
YY永久免费直播频道:68917580
王家林老师联系方式:
标签:
原文地址:http://www.cnblogs.com/yiwei00000/p/SparkWindowFunction.html