标签:
1.combiner
combiner是MR编程模型中的一个组件;
有些任务中map可能会产生大量的本地输出,combiner的作用就是在map端对输出先做一次合并,以减少map和reduce节点之间的数据传输量,提高网络IO性能,是MR的优化手段之一;
两大基本功能:
1.1map的输出的key的聚合,对map输出的key排序、value进行迭代;
1.2reduce功能。
并不是设置了combiner就一定会执行(在当前集群非常繁忙的时候设置了也不会执行);
combiner的执行时机:combiner的执行可能会在map的merge之前也可能在之后,这个参数由配置选项min.num.spill.for.combine(默认为3) 决定的,当map端产生的spill文件最少有这么3个时,combiner会在merge操作之前执行,否则之后。
一般情况下可以使用自己写的reduce类作为combiner,但是特殊情况下也可以自定义
1 public static class combiner extends Reducer<Text,Text,Text,Text>{ 2 private Text info = new Text(); //为了拆分 key值 准备存储新的value值 3 public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{ 4 int sum = 0; 5 for(Text val:values){ 6 sum += Integer.parseInt(val.toString()); 7 } 8 int splitIndex = key.toString().indexOf(":"); 9 info.set(key.toString().substring(splitIndex+1)+":"+sum); //新的value值 10 key.set(key.toString().substring(0, splitIndex)); 11 context.write(key, info); 12 } 13 }
job.setCombinerClass(combiner.class);
2.partitioner
combiner可以减少map的输出到reducer所在节点的网络IO,但是map的输出被分配到哪个reducer上,是由partitioner决定的;
partitioner只有一个方法:
getPartition(Text key, Text value, int numPartitions)
输入的是map的结果对<key,value>和reducer的数目,返回的则是分配的reducer的编号(整数)。系统缺省的partitioner是HashPartitioner,它以key的Hash值对reducer的数目取模,得到对应的reducer。这样可以保证如果有相同的key值则肯定会被分配到同一个reducer上;
和combiner一样,一般使用默认的,但是特殊情况也可以自定义
1 public class NewPartition extends HashPartitioner<Text,Text>{ 2 String keyinfo; 3 public int getPartition(Text key,Text value,int numReducerTasks){ 4 keyinfo = key.toString().split(":")[0]; 5 return super.getPartition(new Text(keyinfo), value, numReducerTasks); 6 } 7 }
job.setPartitionClass(NewPartition.class);
标签:
原文地址:http://www.cnblogs.com/admln/p/MR-combiner-partitioner.html