标签:
一:背景
在MapReduce模型中,reduce的功能大多是统计分类类型的总量、求最大值最小值等,对于这些操作可以考虑在Map输出后进行Combiner操作,这样可以减少网络传输负载,同时减轻reduce任务的负担。Combiner操作是运行在每个节点上的,只会影响本地Map的输出结果,Combiner的输入为本地map的输出结果,很多时候Combiner的逻辑和reduce的逻辑是相同的,因此两者可以共用reducer体。
二:什么时候运行Combiner
(1):当job设置了Combiner,并且spill的个数达到了min.num.spill.for.combine(默认是3)的时候,那么Combiner就会在merge之前执行。
(2):但是有的情况下,merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在merge之后运行。
(3):Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。
三:程序代码
- public class WordCountTest {
-
-
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";
-
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
-
- public static void main(String[] args) {
-
- try {
-
- Configuration conf = new Configuration();
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
-
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
-
-
- Job job = new Job(conf, WordCountTest.class.getName());
-
-
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(TextInputFormat.class);
-
-
- job.setMapperClass(MyMapper.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
-
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
-
-
-
- job.setCombinerClass(MyReducer.class);
-
-
- job.setReducerClass(MyReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
-
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
-
-
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
-
-
- LongWritable oneTime = new LongWritable(1);
-
- Text word = new Text();
-
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
- InterruptedException {
-
-
- String[] splits = value.toString().split("\t");
-
-
- for (String str : splits) {
-
-
- word.set(str);
-
- context.write(word, oneTime);
- }
- }
- }
-
- public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-
-
- LongWritable result = new LongWritable();
-
- protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
- InterruptedException {
-
- int sum = 0;
-
-
- for (LongWritable s : values) {
-
- sum += s.get();
- }
-
- result.set(sum);
-
- context.write(key, result);
- }
- }
- }
程序使用的数据:
程序运行流程:
(1):Map的输入记录为3即<0,hello you> <10,hello me> <19,you me love>
(2):Map的输出记录为7即<hello,1> <you,1> <hello,1> <me,1> <you,1> <me,1> <love,1>
(3):排序分组后的记录为<hello,{1,1}> <love,{1}> <me,{1,1}> <you,{1}> 分了4组,但是记录数依然是7,只不过是分了组而已。
(4):进入Combiner的记录为7,经过Combiner之后的结果为<hello,2> <love,1> <me,2> <you,1>即Combiner的输出为4条记录
四:使用Combiner的限制
并不是所有情况都能使用Combiner,Combiner适用的场景是汇总求和、求最值的场景,但是对于求平均数的场景就不适用了。因为如果在求平均数的程序中使用了Combiner即在每个Map后都使用Combiner进行求平均,每个map计算出的平均值到了reduce端再进行平均,结果和正真的平均数就有出入了。
Hadoop Combiner组件
标签:
原文地址:http://www.cnblogs.com/thinkpad/p/5173732.html