码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce 编程模板

时间:2015-01-19 20:52:11      阅读:144      评论:0      收藏:0      [点我收藏+]

标签:

1.MapReduce 编程模型的5个步骤:

  1)迭代,将输入数据解析成 key/value 对;

  2)将解析的 key/value经过Map处理映射成另一组key/value对;

  3)根据key进行分组;

  4)以分组为单位进行归约(Reduce 过程);

  5)迭代,输出最终结果。

2.MapReduce编程模型模板:

  在进行编程过程只需改变Map()和Reduce()方法,如果没有Reduce过程时需要对run()作适当调整。

  1 import java.io.IOException;
  2 import java.util.jar.JarException;
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.NullWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16 import org.apache.hadoop.util.Tool;
 17 import org.apache.hadoop.util.ToolRunner;
 18 import org.apache.jasper.compiler.JavacErrorDetail;
 19 
 20 public class Example extends Configured implements Tool{
 21     
 22     enum Counter{
 23         LINESKIP; //输出错误行
 24     }
 25     
 26     /*MapClass
 27      * Mapper<
 28      * LongWritable            输入的 key 
 29      * Text                 输入的 value
 30      * NullWritable/Text    输出的 key
 31      * Text                    输出的 value
 32      * >
 33      * */
 34     //public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>   //没有Reduce过程时
 35     public static class Map extends Mapper<LongWritable,Text,Text,Text>{
 36         public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
 37         {
 38             String mydata=value.toString();   //读取源数据
 39              try{
 40                 //数据处理
 41                 String[] mydataSplite=mydata.split("");  //数据切分 
 42                 String aData=mydataSplite[0];
 43                 String bData=mydataSplite[1];
 44                 
 45                 /*
 46                  * 没有Reduce过程时
 47                  * Text outText=new Text(aData+""+bData);
 48                  * context.write(new Text(aData), new Text(bData));  //输出 key/value ,NullWritable.get()避免输出制表符
 49                 */
 50                 
 51                 context.write(new Text(aData), new Text(bData));  //输出 key/value 
 52             }catch(java.lang.ArrayIndexOutOfBoundsException e)
 53             {
 54                 context.getCounter(Counter.LINESKIP).increment(1);  //出错计数+1
 55                 return;
 56             }
 57         }
 58     }
 59     
 60     /*Reduce静态类
 61      * Reducer<
 62      * Text,    输入的 key 
 63      * Text,    输入的 value
 64      * Text,    输出的 key
 65      * Text     输出的 value
 66      * 
 67      * Reduce 的输入格式应与 Map的输出格式一致
 68      * >
 69      * */
 70     public static class Reduce extends Reducer<Text,Text,Text,Text>
 71     {
 72         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
 73         {
 74             String valuString;
 75             String outString="";
 76             for (Text value : values) 
 77             {
 78                 valuString=value.toString();
 79                 outString+=valuString+",";
 80             }
 81             context.write(key, new Text(outString));  //输出参数与定义格式一致,如果不是制表符分离的要换成空值
 82         }
 83     }
 84     
 85     /*run设置运行任务*/
 86     public int run(String[] args) throws Exception {
 87         Configuration conf = getConf();
 88         
 89         Job job = new Job(conf,"Example");   //作务名
 90         job.setJarByClass(Example.class);    //选择class
 91         
 92         FileInputFormat.setInputPaths(job, new Path(args[0]));    //输入路径
 93         FileOutputFormat.setOutputPath(job, new Path(args[1]));   //输出路径
 94         
 95         job.setMapperClass(Map.class);           //调用 Map class启动Map task     
 96         job.setReducerClass(Reduce.class);       //调用 Reduce class 雇用 Reduce task
 97         job.setCombinerClass(Reduce.class); 
 98         job.setInputFormatClass(TextInputFormat.class);    //输入格式
 99         job.setOutputFormatClass(TextOutputFormat.class);  //输出格式
100         //job.setOutputKeyClass(NullWritable.class);       //没有Reduce过程时,输出 key 格式,应该与指定的格式一致
101         job.setOutputKeyClass(Text.class);                 //输出 key 格式,应该与指定的格式一致
102         job.setOutputValueClass(Text.class);               //输出 value 格式  
103         
104         System.exit(job.waitForCompletion(true)?0:1);
105         return 0;
106     }
107     
108     /*主函数入口*/
109     public static void main(String[] args) throws Exception {
110         int res = ToolRunner.run(new Configuration(),new Example(),args);
111         System.exit(res);
112     }
113     
114 }

 

MapReduce 编程模板

标签:

原文地址:http://www.cnblogs.com/xp12/p/4234686.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!