码迷,mamicode.com
首页 > 其他好文 > 详细

Scala实现Mapreduce程序2-----Top5

时间:2017-06-03 11:17:36      阅读:169      评论:0      收藏:0      [点我收藏+]

标签:sort   override   output   cal   string   inpu   top   exception   lin   

输入n个数,返回TOP5的数字

scala实现,以各个数字为key,""为空,按照key进行排序,取出前5个

object Top5 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("")
val sc = new SparkContext(conf)
val one = sc.textFile("/spark/test")
var index=0
val text=one.filter(x=>(x.trim.length>0)&&(x.split(",").length==4)).map(_.split(",")(2).toInt).
map(x=>(x,"")).sortByKey(false).map(x=>x._1).take(5).foreach(x=>{
index=index+1
println("top index:"+index+"\t"+x)
})

}
}

Mapreduce实现,(key,"") =>(index+"",key)

MapReduce中的IntWritable默认是按照降序排列的,要实现升序排序,自己实现MyIntWritabel
public class MyIntWritable implements WritableComparable<MyIntWritable> {
private Integer num;

public MyIntWritable(Integer num){
this.num=num;
}

public MyIntWritable(){}

public void write(DataOutput output) throws IOException {
output.writeInt(num);
}

public void readFields(DataInput input) throws IOException {
this.num=input.readInt();
}

public int compareTo(MyIntWritable o){
int minux=this.num-o.num;
return minux*(-1);
}

@Override
public int hashCode() {
return this.num.hashCode();
}

public String toSting(){
return this.num+"";
}

public boolean equals(Object obj) {
if (obj instanceof MyIntWritable) {
return false;
}
MyIntWritable ok2 = (MyIntWritable) obj;
return (this.num == ok2.num);
}
}
package HadoopvsSpark;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created by Administrator on 2017/5/26.
*/
public class TopN {
public static class TopNMapper extends Mapper<LongWritable,Text,MyIntWritable,Text>{
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
String line=value.toString();
if(line.trim().length()>0){
String str[]=line.split( "," );
if(str.length==4){
context.write( new MyIntWritable( Integer.parseInt( str[2] ) ),new Text( "" ) );
}
}
}
}

public static class TopNReducer extends Reducer<MyIntWritable,Text,Text,MyIntWritable>{
private int index=0;
public void reduce(MyIntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
index++;
if(index<=5){
context.write( new Text( index+" " ),key );
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


org.apache.hadoop.conf.Configuration conf=new org.apache.hadoop.conf.Configuration();
Job job=new Job(conf,"topn");
job.setJarByClass( TopN.class );

job.setMapperClass( TopNMapper.class );
job.setMapOutputKeyClass( MyIntWritable.class );
job.setMapOutputValueClass( Text.class );

job.setReducerClass( TopNReducer.class );
job.setOutputKeyClass( Text.class);
job.setOutputValueClass( MyIntWritable.class );

FileInputFormat.addInputPath( job,new Path( args[0] ) );
Path outputdir=new Path( args[1] );
FileSystem fs=FileSystem.get( conf ); //判断输出目录是否存在
if(fs.exists( outputdir )){
fs.delete( outputdir,true );
}
FileOutputFormat.setOutputPath( job,outputdir ) ;
System.out.println(job.waitForCompletion( true )?1:0);
}
}

 

Scala实现Mapreduce程序2-----Top5

标签:sort   override   output   cal   string   inpu   top   exception   lin   

原文地址:http://www.cnblogs.com/sunt9/p/6936383.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!