标签:
package com.qq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //import java.net.URI; import org.apache.hadoop.conf.Configuration; //import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; //import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class SortApp { /* static final String INPUT_PATH = "hdfs://cloud4:9000/input"; static final String OUT_PATH = "hdfs://cloud4:9000/out";*/ public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); /* final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUT_PATH))){ fileSystem.delete(new Path(OUT_PATH), true); } */ Job job = Job.getInstance(conf); job.setJarByClass(SortApp.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NewK2.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); // job.setInputFormatClass(TextInputFormat.class); //job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split(","); final NewK2 k2 = new NewK2(Long.parseLong(splited[1]), Long.parseLong(splited[2])); final LongWritable v2 = new LongWritable(Long.parseLong(splited[2])); context.write(k2, v2); }; } static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); }; } static class NewK2 implements WritableComparable<NewK2>{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; //第一列不同时,第一列升序 第一列相同的时候第二列升序 if(minus !=0){ return (int)minus; } return (int)( this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } } }
标签:
原文地址:http://www.cnblogs.com/52hadoop/p/4287501.html