码迷,mamicode.com
首页 > 其他好文 > 详细

hadoop1中partition和combiner作用

时间:2015-04-06 23:14:26      阅读:332      评论:0      收藏:0      [点我收藏+]

标签:

---恢复内容开始---

1、解析Partiton

  把map任务的输出的中间结果按照key的范围进行划分成r份,r代表reduce任务的个数。hadoop默认有个类HashPartition实现分区,通过key对reduce的个数取模(key%r),这样可以保证一段范围内的key交由一个reduce处理。以此来实现reduce的负载均衡。不至于使有些reduce处理的任务压力过大,有些reduce空闲。

  如果我们对hadoop本身的分区算法不满意,或者我们因为我们的业务需求,我们可以自定义一个类实现Partition接口,实现里面的方法,在getPartiton()方法中实现自己的分区算法。在提交作业的main方法中通setPartitonclass()方法这个类,就可以了。

 以下为代码实例

  

  1. package org.apache.hadoop.examples;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.*;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.conf.*;  
  7. import org.apache.hadoop.io.*;  
  8. import org.apache.hadoop.mapred.*;  
  9. import org.apache.hadoop.util.*;  
  10.   
  11. /** 
  12.  * 输入文本,以tab间隔 
  13.  * kaka    1       28 
  14.  * hua     0       26 
  15.  * chao    1 
  16.  * tao     1       22 
  17.  * mao     0       29      22 
  18.  * */  
  19.   
  20. //Partitioner函数的使用  
  21.   
  22. public class MyPartitioner {  
  23.     // Map函数  
  24.     public static class MyMap extends MapReduceBase implements  
  25.             Mapper<LongWritable, Text, Text, Text> {  
  26.         public void map(LongWritable key, Text value,  
  27.                 OutputCollector<Text, Text> output, Reporter reporter)  
  28.                 throws IOException {  
  29.             String[] arr_value = value.toString().split("\t");  
  30.             //测试输出  
  31. //          for(int i=0;i<arr_value.length;i++)  
  32. //          {  
  33. //              System.out.print(arr_value[i]+"\t");  
  34. //          }  
  35. //          System.out.print(arr_value.length);  
  36. //          System.out.println();         
  37.             Text word1 = new Text();  
  38.             Text word2 = new Text();  
  39.             if (arr_value.length > 3) {  
  40.                 word1.set("long");  
  41.                 word2.set(value);  
  42.             } else if (arr_value.length < 3) {  
  43.                 word1.set("short");  
  44.                 word2.set(value);  
  45.             } else {  
  46.                 word1.set("right");  
  47.                 word2.set(value);  
  48.             }  
  49.             output.collect(word1, word2);  
  50.         }  
  51.     }  
  52.       
  53.     public static class MyReduce extends MapReduceBase implements  
  54.             Reducer<Text, Text, Text, Text> {  
  55.         public void reduce(Text key, Iterator<Text> values,  
  56.                 OutputCollector<Text, Text> output, Reporter reporter)  
  57.                 throws IOException {  
  58.             int sum = 0;  
  59.             System.out.println(key);  
  60.             while (values.hasNext()) {  
  61.                 output.collect(key, new Text(values.next().getBytes()));      
  62.             }  
  63.         }  
  64.     }  
  65.   
  66.     // 接口Partitioner继承JobConfigurable,所以这里有两个override方法  
  67.     public static class MyPartitionerPar implements Partitioner<Text, Text> {  
  68.         /** 
  69.          * getPartition()方法的 
  70.          * 输入参数:键/值对<key,value>与reducer数量numPartitions 
  71.          * 输出参数:分配的Reducer编号,这里是result 
  72.          * */  
  73.         @Override  
  74.         public int getPartition(Text key, Text value, int numPartitions) {  
  75.             // TODO Auto-generated method stub  
  76.             int result = 0;  
  77.             System.out.println("numPartitions--" + numPartitions);  
  78.             if (key.toString().equals("long")) {  
  79.                 result = 0 % numPartitions;  
  80.             } else if (key.toString().equals("short")) {  
  81.                 result = 1 % numPartitions;  
  82.             } else if (key.toString().equals("right")) {  
  83.                 result = 2 % numPartitions;  
  84.             }  
  85.             System.out.println("result--" + result);  
  86.             return result;  
  87.         }  
  88.           
  89.         @Override  
  90.         public void configure(JobConf arg0)   
  91.         {  
  92.             // TODO Auto-generated method stub  
  93.         }  
  94.     }  
  95.   
  96.     //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner  
  97.     public static void main(String[] args) throws Exception {  
  98.         JobConf conf = new JobConf(MyPartitioner.class);  
  99.         conf.setJobName("MyPartitioner");  
  100.           
  101.         //控制reducer数量,因为要分3个区,所以这里设定了3个reducer  
  102.         conf.setNumReduceTasks(3);  
  103.   
  104.         conf.setMapOutputKeyClass(Text.class);  
  105.         conf.setMapOutputValueClass(Text.class);  
  106.   
  107.         //设定分区类  
  108.         conf.setPartitionerClass(MyPartitionerPar.class);  
  109.   
  110.         conf.setOutputKeyClass(Text.class);  
  111.         conf.setOutputValueClass(Text.class);  
  112.   
  113.         //设定mapper和reducer类  
  114.         conf.setMapperClass(MyMap.class);  
  115.         conf.setReducerClass(MyReduce.class);  
  116.   
  117.         conf.setInputFormat(TextInputFormat.class);  
  118.         conf.setOutputFormat(TextOutputFormat.class);  
  119.   
  120.         FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  121.         FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  122.   
  123.         JobClient.runJob(conf);  
  124.     }  
  125. }  

 

2、解析Combiner

  在Partiton之前,我们还可以对中间结果进行Combiner,即将中间结果中有着相同key 的(key,value)键值对进行合并成一对,Combiner的过程与reduce的过程类似,很多情况下可以直接使用reduce,但是Combiner作为Map任务的一部分,在Map输出后紧接着执行,通过Combiner的执行,减少了中间结果中的(key,value)对数目,reduce在从map复制数据时将会大大减少网络流量,每个reduce需要和原许多个map任务节点通信以此来取得落到它负责key区间内的中间结果,然后执行reduce函数,得到一个最中结果文件。有R个reduce任务,就有R个最终结果,这R个最终结果并不需要合并成一个结果,因为这R个最终结果又可以作为另一次计算的输入,开始另一次计算。

  combiner使用总结:

  combiner的使用可以在满足业务需求的情况下,大大提高job的运行速度,如果不合适,则将到最后导致结果不一致(如:求平均值)。

  以下为Combiner代码示例

  1. package com;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.DoubleWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  18. import org.apache.hadoop.util.Tool;  
  19. import org.apache.hadoop.util.ToolRunner;  
  20.   
  21. public class AveragingWithCombiner extends Configured implements Tool {  
  22.   
  23.     public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {  
  24.           
  25.         static enum ClaimsCounters { MISSING, QUOTED };  
  26.         // Map Method  
  27.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  28.             String fields[] = value.toString().split(",", -20);  
  29.             String country = fields[4];  
  30.             String numClaims = fields[8];  
  31.               
  32.             if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {  
  33.                 context.write(new Text(country), new Text(numClaims + ",1"));  
  34.             }  
  35.         }  
  36.     }  
  37.       
  38.     public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {  
  39.           
  40.         // Reduce Method  
  41.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  42.             double sum = 0;  
  43.             int count = 0;  
  44.             for (Text value : values) {  
  45.                 String fields[] = value.toString().split(",");  
  46.                 sum += Double.parseDouble(fields[0]);  
  47.                 count += Integer.parseInt(fields[1]);  
  48.             }  
  49.             context.write(key, new DoubleWritable(sum/count));  
  50.         }  
  51.     }  
  52.       
  53.     public static class Combine extends Reducer<Text,Text,Text,Text> {  
  54.           
  55.         // Reduce Method  
  56.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  57.             double sum = 0;  
  58.             int count = 0;  
  59.             for (Text value : values) {  
  60.                 String fields[] = value.toString().split(",");  
  61.                 sum += Double.parseDouble(fields[0]);  
  62.                 count += Integer.parseInt(fields[1]);  
  63.             }  
  64.             context.write(key, new Text(sum+","+count));  
  65.         }  
  66.     }  
  67.       
  68.     // run Method  
  69.     public int run(String[] args) throws Exception {  
  70.         // Create and Run the Job  
  71.         Job job = new Job();  
  72.         job.setJarByClass(AveragingWithCombiner.class);  
  73.           
  74.         FileInputFormat.addInputPath(job, new Path(args[0]));  
  75.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  76.           
  77.         job.setJobName("AveragingWithCombiner");  
  78.         job.setMapperClass(MapClass.class);  
  79.         job.setCombinerClass(Combine.class);  
  80.         job.setReducerClass(Reduce.class);  
  81.         job.setInputFormatClass(TextInputFormat.class);  
  82.         job.setOutputFormatClass(TextOutputFormat.class);  
  83.           
  84.         job.setOutputKeyClass(Text.class);  
  85.         job.setOutputValueClass(Text.class);  
  86.           
  87.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  88.         return 0;  
  89.     }  
  90.       
  91.     public static void main(String[] args) throws Exception {  
  92.         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);  
  93.         System.exit(res);  
  94.     }  
  95.   
  96. }  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

---恢复内容结束---

hadoop1中partition和combiner作用

标签:

原文地址:http://www.cnblogs.com/sy270321/p/4396843.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!