码迷,mamicode.com
首页 > 编程语言 > 详细

【Hadoop】Hadoop MR 自定义排序

时间:2016-09-07 12:54:50      阅读:278      评论:0      收藏:0      [点我收藏+]

标签:

1、概念

技术分享

2、代码示例

FlowSort

package com.ares.hadoop.mr.flowsort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import com.ares.hadoop.mr.exception.LineException;

public class FlowSort extends Configured implements Tool {
    private static final Logger LOGGER = Logger.getLogger(FlowSort.class);
    enum Counter {
        LINESKIP
    }
    
    public static class FlowSortMapper extends Mapper<LongWritable, Text, 
        FlowBean, NullWritable> {
        private String line;
        private int length;
        private final static char separator = \t;
        
        private String phoneNum;
        private long upFlow;
        private long downFlow;
        private long sumFlow;
        
        private FlowBean flowBean = new FlowBean();
        private NullWritable nullWritable = NullWritable.get();
        
        @Override
        protected void map(
                LongWritable key,
                Text value,
                Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.map(key, value, context);
            String errMsg;
            try {
                line = value.toString();
                String[] fields = StringUtils.split(line, separator);
                length = fields.length;
                if (length != 4) {
                    throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
                }
                
                phoneNum = fields[0];
                upFlow = Long.parseLong(fields[1]);
                downFlow = Long.parseLong(fields[2]);
                sumFlow = Long.parseLong(fields[3]);
                
                flowBean.setPhoneNum(phoneNum);
                flowBean.setUpFlow(upFlow);
                flowBean.setDownFlow(downFlow);
                flowBean.setSumFlow(sumFlow);
                
                context.write(flowBean, nullWritable);
            } catch (LineException e) {
                // TODO: handle exception
                LOGGER.error(e);
                System.out.println(e);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            } catch (NumberFormatException e) {
                // TODO: handle exception
                errMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...";
                LOGGER.error(errMsg);
                System.out.println(errMsg);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            } catch (Exception e) {
                // TODO: handle exception
                LOGGER.error(e);
                System.out.println(e);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            }            
        }
    }
    
    public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, 
        FlowBean, NullWritable> {
        @Override
        protected void reduce(
                FlowBean key,
                Iterable<NullWritable> values,
                Reducer<FlowBean, NullWritable, FlowBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.reduce(arg0, arg1, arg2);
            context.write(key, NullWritable.get());
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        String errMsg = "FlowSort: TEST STARTED...";
        LOGGER.debug(errMsg);
        System.out.println(errMsg);
        
        Configuration conf = new Configuration();
        //FOR Eclipse JVM Debug  
        //conf.set("mapreduce.job.jar", "flowsum.jar");
        Job job = Job.getInstance(conf);
        
        // JOB NAME
        job.setJobName("FlowSort");
        
        // JOB MAPPER & REDUCER
        job.setJarByClass(FlowSort.class);
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);
        
        // MAP & REDUCE
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);
        // MAP
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        // JOB INPUT & OUTPUT PATH
        //FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job, args[1]);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // VERBOSE OUTPUT
        if (job.waitForCompletion(true)) {
            errMsg = "FlowSort: TEST SUCCESSFULLY...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            return 0;
        } else {
            errMsg = "FlowSort: TEST FAILED...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            return 1;
        }            
        
    }
    
    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            String errMsg = "FlowSort: ARGUMENTS ERROR";
            LOGGER.error(errMsg);
            System.out.println(errMsg);
            System.exit(-1);
        }
        
        int result = ToolRunner.run(new Configuration(), new FlowSort(), args);
        System.exit(result);
    }
}

FlowBean

package com.ares.hadoop.mr.flowsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
    private String phoneNum;
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    
    public FlowBean() {
        // TODO Auto-generated constructor stub
    }    
//    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
//        super();
//        this.phoneNum = phoneNum;
//        this.upFlow = upFlow;
//        this.downFlow = downFlow;
//        this.sumFlow = sumFlow;
//    }


    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNum = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return "" + phoneNum + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        // TODO Auto-generated method stub
        return sumFlow>o.getSumFlow()?-1:1;
    }
    
}

LineException

package com.ares.hadoop.mr.exception;

public class LineException extends RuntimeException {
    private static final long serialVersionUID = 2536144005398058435L;
    
    public LineException() {
        super();
        // TODO Auto-generated constructor stub
    }

    public LineException(String message, Throwable cause) {
        super(message, cause);
        // TODO Auto-generated constructor stub
    }

    public LineException(String message) {
        super(message);
        // TODO Auto-generated constructor stub
    }

    public LineException(Throwable cause) {
        super(cause);
        // TODO Auto-generated constructor stub
    }
}

 

【Hadoop】Hadoop MR 自定义排序

标签:

原文地址:http://www.cnblogs.com/junneyang/p/5848770.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!