码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop之-->自定义分组 RawComparator

时间:2015-12-15 06:23:26      阅读:250      评论:0      收藏:0      [点我收藏+]

标签:

data:

3  3
3  2
3  2
2  2
2  1
1  1

---------------------

需求:

1 1
2 2
3 3

当第一列相同时候要第二列的最小值

package group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * 
 * 自定义分组 k2
 * 
 */
public class GroupApp {
    
    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, GroupApp.class.getSimpleName());
        job.setJarByClass(GroupApp.class);
        
        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        
        //设置自定义的分组键
        job.setGroupingComparatorClass(MyGropComparator.class);
        
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        
        job.waitForCompletion(true);
    }
    
    
    public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
        
        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
                throws IOException, InterruptedException {
            
            String[] split = value.toString().split("\t");
            context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1])));
            
        }
    } 
    
    /**
     * 
     * k2这时候没有相等的,意味着Reduce接收到6个分组
     *
     */
    public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(NewK2 key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {
            
            Long min = Long.MAX_VALUE;
            for (LongWritable longWritable : values) {
                if(longWritable.get()<min){
                    min = longWritable.get();
                }
            }
            context.write(new LongWritable(key.frist),new LongWritable(min));
        }
    }
    
    
    
    /**
     * 
     * 自定义排序
     * 
     *
     */
    public static class NewK2 implements WritableComparable<NewK2>{
        long  frist;
        long second;
        
        public NewK2(){}
        public NewK2(long frist, long second) {
            this.frist = frist;
            this.second = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.frist);
            out.writeLong(this.frist);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.frist = in.readLong();
            this.second = in.readLong();
            
        }

        /**
         *  做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序
         */
        @Override
        public int compareTo(NewK2 o) {
            long minus = this.frist - o.frist;
            if(minus != 0){
                //不相等
                return (int)minus;
            }
            //第一列相等,让第二列进行处理
            return (int)(this.second - o.second);
        }
    }
    
    
    /**
     * 自定义分组
     *
     */
    
    public static class MyGropComparator implements RawComparator<NewK2>{

        @Override
        public int compare(NewK2 o1, NewK2 o2) {
            return 0;
        }
        
        /**
         *  b1相当于this  b2相当于o
         *  s1 和s2 表示的是 从很长的字节数组中从哪个位置开始读取值,
         *  l1和l2处理的长度
         */
        
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        }
        
    } 
    
}

 

Hadoop之-->自定义分组 RawComparator

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5046983.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!