码迷,mamicode.com
首页 > 其他好文 > 详细

【重拾】MapReducer[第一篇]

时间:2015-12-07 00:45:19      阅读:131      评论:0      收藏:0      [点我收藏+]

标签:package   import   mapreducer   记录   统计   去重   

    昨天听朋友说了一个题目,具体的题目忘了! 有数据是这样的:

<1,0> 
<2,8>
<1,9>
<2,7>
<1,0>
<3,15>
<5,20>  
<3,25>
<4,20>
<3,50>

    要得到结果试着样的:

1    2
2    2
3    3
4    1
5    1

    对左侧数据的统计,对右侧数据的去重; 当左侧相同时,右侧也相同,之记录一次;当左侧相同,右侧不同,左侧数据次数累加; 当左侧不相同,右侧也不相同时候,左侧数据累加统计。

    了解过大意以后发现这个就是对数据的去重统计的一个小测试! 思路就不写了,跟着代码随意遐想,代码仅限上述情况:

package com.amir.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MapReducer_MulTask {

    public static class MassRemovingMap extends MapReduceBase implements
            Mapper<Object, Text, Text, Text> {

        private Text line = new Text();

        public void map(Object key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            line = value;
            output.collect(line, new Text(""));
        }
    }

    public static class MassRemovingReduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, Text> {

        public void reduce(Text key, Iterator<IntWritable> value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            output.collect(key, new Text(""));
        }
    }

    public static class StatisticsMap extends MapReduceBase implements
            Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {

            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                String[] temp = itr.nextToken().split(",");
                String akey = temp[0].replace("<", "");
                word.set(akey);
                output.collect(word, one);
            }
        }
    }

    public static class StatisticsReduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterator<IntWritable> value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
            int sum = 0;
            while (value.hasNext()) {
                IntWritable val = value.next();
                sum += val.get();
            }
            result.set(sum);
            output.collect(key, result);
        }

    }

    public static void TaskMassRemoving() throws IOException{
        String[] param = { "/test/testw/ss", "/test/testw/woutput" };
        Configuration conf = new Configuration();
        JobConf jobconf = new JobConf(conf, MapReducer_MulTask.class);
        jobconf.setJobName("TaskMassRemoving");
        
        jobconf.setJarByClass(MapReducer_MulTask.class);
        jobconf.setMapperClass(MassRemovingMap.class);
        jobconf.setCombinerClass(MassRemovingReduce.class);
        jobconf.setReducerClass(MassRemovingReduce.class);
        jobconf.setOutputKeyClass(Text.class);
        jobconf.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(jobconf, new Path(param[0]));
        FileOutputFormat.setOutputPath(jobconf, new Path(param[1]));
        JobClient.runJob(jobconf).waitForCompletion();
    }
    
    public static void TaskStatistics() throws IOException{
        String[] param = {"/test/testw/woutput/part-00000","/test/testw/woutput/wordcount"};
        Configuration conf = new Configuration();
        JobConf jobconf = new JobConf(conf, MapReducer_MulTask.class);
        jobconf.setJobName("TaskStatistics");
        
        jobconf.setJarByClass(MapReducer_MulTask.class);
        jobconf.setMapperClass(StatisticsMap.class);
        jobconf.setCombinerClass(StatisticsReduce.class);
        jobconf.setReducerClass(StatisticsReduce.class);
        
        jobconf.setOutputKeyClass(Text.class);
        jobconf.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(jobconf, new Path(param[0]));
        FileOutputFormat.setOutputPath(jobconf, new Path(param[1]));
        JobClient.runJob(jobconf).waitForCompletion();
        
    }
    
    public static void main(String[] args) throws IOException {
        try {
            MapReducer_MulTask.TaskMassRemoving(); // 01
            MapReducer_MulTask.TaskStatistics();  // 02
            System.out.println("OK!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

    主要对MapReducer 基本使用的测试!!!!

本文出自 “荒废了五年以后” 博客,请务必保留此出处http://superamir.blog.51cto.com/9720215/1720122

【重拾】MapReducer[第一篇]

标签:package   import   mapreducer   记录   统计   去重   

原文地址:http://superamir.blog.51cto.com/9720215/1720122

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!