用Spark写的wordcount
scala版:
object WordCountDemo {
def main(args: Array[String]): Unit = {
//设置log级别
Logger.getLogger("org").setLevel(Level.WARN)
val conf = new SparkConf().setAppName("WordCountDemo").setMaster("local")
val sc = new SparkContext(conf)
sc.textFile("hdfs://hadoop001:9000/in/word")
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect().foreach(println(_))
sc.stop()
}
}
java版:
public class WordCountDemo {
public static void main(String[] args) {
//log
Logger.getLogger("org").setLevel(Level.WARN);
//配置
SparkConf conf = new SparkConf().setAppName("WordCountDemo").setMaster("local");
//spark上下文
JavaSparkContext jsc = new JavaSparkContext(conf);
//创建
JavaRDD<String> linesRDD = jsc.textFile("hdfs://hadoop001:9000/in/word");
//flatMap操作
JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
String[] words = line.split(" ");
return Arrays.asList(words);
}
});
//map操作
JavaPairRDD<String, Integer> tupleRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word,1);
}
});
//reduceByKey操作
JavaPairRDD<String, Integer> resultRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//拉回driver端
List<Tuple2<String, Integer>> result = resultRDD.collect();
//遍历
for(Tuple2<String, Integer> tuple2 : result){
System.out.println(tuple2._1 +"="+ tuple2._2);
}
jsc.stop();
}
}