Top K Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。 1.实例描述 假设取Top 1,则有如下输入和输出。 输入: Hello World Bye World Hello Hadoop Bye Hadoop Bye Hadoop Hello Hadoop 输出: 词Hadoop 词频4 2.设计思路 首先统计WordCount的词频,将数据转化为(词,词频)的数据对,第二个阶段采用分 治的思想,求出RDD每个分区的Top K,最后将每个分区的Top K结果合并以产生新的集 合,在集合中统计出Top K的结果。每个分区由于存储在单机的,所以可以采用单机求Top K的方式。本例采用堆的方式。也可以直接维护一个含K个元素的数组,感兴趣的读者可以 参考其他资料了解堆的实现。 3.代码示例 Top K算法示例代码如下: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TopK { def main(args:Array[String]) { /*执行WordCount,统计出最高频的词*/ val spark = new SparkContext("local", "TopK", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) val count = spark.textFile("data").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) /*统计RDD每个分区内的Top K查询*/ val topk = count.mapPartitions(iter => { while(iter.hasNext) { putToHeap(iter.next()) } getHeap().iterator } ).collect() /*将每个分区内统计出的TopK查询合并为一个新的集合,统计出TopK查询*/ val iter = topk.iterator while(iter.hasNext) { putToHeap(iter.next()) } val outiter=getHeap().iterator /*输出TopK的值*/ println("Topk 值 :") while(outiter.hasNext) { println("\n 词频:"+outiter.next()._1+" 词:"+outiter.next()._2) } spark.stop() } } def putToHeap(iter : (String, Int)) { /*数据加入含k个元素的堆中*/ …… } def getHeap(): Array[(String, Int)] = { /*获取含k个元素的堆中的元素*/ val a=new Array[(String, Int)]() …… } 4.应用场景 Top K的示例模型可以应用在求过去一段时间消费次数最多的消费者、访问最频繁的IP 地址和最近、更新、最频繁的微博等应用场景。
本文出自 “星月情缘” 博客,请务必保留此出处http://xuegodxingyue.blog.51cto.com/5989753/1949780
原文地址:http://xuegodxingyue.blog.51cto.com/5989753/1949780