码迷,mamicode.com
首页 > 编程语言 > 详细

Hadoop之-->自定义排序

时间:2015-12-15 00:58:08      阅读:290      评论:0      收藏:0      [点我收藏+]

标签:

data:

3  3
3  2
3  1
2  2
2  1
1  1

---------------------

需求:

1  1
2  1
2  2
3  1
3  2
3  3

package sort;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 
 * 
 * 
 */
public class SortApp {
    
    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, SortApp.class.getSimpleName());
        job.setJarByClass(SortApp.class);
        
        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        
        job.waitForCompletion(true);
    }
    
    
    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable>{
        
        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {
            
            String[] split = value.toString().split("\t");
            context.write(new LongWritable(Long.parseLong(split[0])),new LongWritable(Long.parseLong(split[1])));
            
        }
    }
    
    public static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(LongWritable key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {
        
            for (LongWritable times : values) {
                context.write(key, times);
            }
        }
    }
}

 

 

执行结果:k2排序了,V2不参与排序

技术分享

 

如何使第二列也排序呢?

则需要将第二列也作为k2,进行排序,这时候需要自定义序列化类型

 

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 
 * 
 * 
 */
public class SortApp {
    
    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, SortApp.class.getSimpleName());
        job.setJarByClass(SortApp.class);
        
        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        
        job.waitForCompletion(true);
    }
    
    
    public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
        
        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
                throws IOException, InterruptedException {
            
            String[] split = value.toString().split("\t");
            context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1])));
            
        }
    } 
    
    /**
     * 
     * k2这时候没有相等的,意味着Reduce接收到6个分组
     *
     */
    public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(NewK2 key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {
            
                context.write(new LongWritable(key.frist),new LongWritable(key.second));
        }
    }
    
    
    
    /**
     * 
     * 自定义排序
     * 
     *
     */
    public static class NewK2 implements WritableComparable<NewK2>{
        long  frist;
        long second;
        
        public NewK2(){}
        public NewK2(long frist, long second) {
            this.frist = frist;
            this.second = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.frist);
            out.writeLong(this.frist);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.frist = in.readLong();
            this.second = in.readLong();
            
        }

        /**
         *  做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序
         */
        @Override
        public int compareTo(NewK2 o) {
            long minus = this.frist - o.frist;
            if(minus != 0){
                //不相等
                return (int)minus;
            }
            //第一列相等,让第二列进行处理
            return (int)(this.second - o.second);
        }
    }
}

 

技术分享

 

Hadoop之-->自定义排序

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5046939.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!