标签:
scala> val textFile = sc.textFile("/Users/admin/spark-1.5.1-bin-hadoop2.4/README.md")
scala> val topWord = textFile.flatMap(_.split(" ")).filter(!_.isEmpty).map((_,1)).reduceByKey(_+_).map{case (word,count) =>(count,word)}.sortByKey(false)
scala> topWord.take(5).foreach(println)
redult:
(21,the)
(14,Spark)
(14,to)
(12,for)
(10,a)
原文参考:
下面就是Spark Scala REPL shell的简单实例:
1 |
scala> val hamlet = sc.textFile( "~/temp/gutenburg.txt" )
|
2 |
hamlet : org.apache.spark.rdd.RDD[String] = MappedRDD[ 1 ] at textFile at <console> : 12
|
在上面的代码中,我们读取了文件,并创建了一个String类型的RDD,每一个String代表文件中的每一行。
1 |
scala> val topWordCount = hamlet.flatMap(str = >str.split( " " ))
|
2 |
.filter(! _ .isEmpty).map(word = >(word, 1 )).reduceByKey( _ + _ ) |
3 |
.map{ case (word, count) = > (count, word)}.sortByKey( false )
|
5 |
topWordCount : org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[ 10 ] at sortByKey at <console> : 14
|
1、通过上述命令我们可以发现这个操作非常简单——通过简单的Scala API来连接transformations和actions。
2、可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!_.isEmpty)将它们过滤掉。
3、每个word都被映射成一个键值对:map(word=>(word,1))。
4、为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。 _+_ 可以非常便捷地为每个key赋值。
5、我们得到了words以及各自的counts,下一步需要做的是根据counts排序。在Apache Spark,用户只能根据key排序,而不是值。因此,这里需要使用map{case (word, count) => (count, word)}将(word, count)流转到(count, word)。
6、需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。
1 |
scala> topWordCount.take( 5 ).foreach(x = >println(x)) |
上述命令包含了一个.take(5) (an action operation, which triggers computation)和在 ~/temp/gutenburg.txt文本中输出10个最常用的words。
二、spark入门之spark shell:文本中发现5个最常用的word
标签:
原文地址:http://www.cnblogs.com/ylcoder/p/5730935.html