标签:
data:
3 3
3 2
3 1
2 2
2 1
1 1
---------------------
需求:
1 1
2 1
2 2
3 1
3 2
3 3
package sort; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * * */ public class SortApp { private static final String inputPaths = "hdfs://hadoop:9000/data"; private static final String OUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); fileSystem.delete(new Path(OUT_PATH), true); Job job = new Job(conf, SortApp.class.getSimpleName()); job.setJarByClass(SortApp.class); FileInputFormat.setInputPaths(job, inputPaths); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable>{ @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); context.write(new LongWritable(Long.parseLong(split[0])),new LongWritable(Long.parseLong(split[1]))); } } public static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(LongWritable key, Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { for (LongWritable times : values) { context.write(key, times); } } } }
执行结果:k2排序了,V2不参与排序
如何使第二列也排序呢?
则需要将第二列也作为k2,进行排序,这时候需要自定义序列化类型
package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * * */ public class SortApp { private static final String inputPaths = "hdfs://hadoop:9000/data"; private static final String OUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); fileSystem.delete(new Path(OUT_PATH), true); Job job = new Job(conf, SortApp.class.getSimpleName()); job.setJarByClass(SortApp.class); FileInputFormat.setInputPaths(job, inputPaths); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1]))); } } /** * * k2这时候没有相等的,意味着Reduce接收到6个分组 * */ public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(NewK2 key, Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.frist),new LongWritable(key.second)); } } /** * * 自定义排序 * * */ public static class NewK2 implements WritableComparable<NewK2>{ long frist; long second; public NewK2(){} public NewK2(long frist, long second) { this.frist = frist; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.frist); out.writeLong(this.frist); } @Override public void readFields(DataInput in) throws IOException { this.frist = in.readLong(); this.second = in.readLong(); } /** * 做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序 */ @Override public int compareTo(NewK2 o) { long minus = this.frist - o.frist; if(minus != 0){ //不相等 return (int)minus; } //第一列相等,让第二列进行处理 return (int)(this.second - o.second); } } }
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5046939.html