码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce之Combiner组件

时间:2015-06-13 23:09:03      阅读:184      评论:0      收藏:0      [点我收藏+]

标签:combiner   mapreduce   

简述

Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入;

在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载

并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。

什么时候运行Combiner?

1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行;
2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行;
3、Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。

技术分享

实例代码:

package MyCombiner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class CombinerExp {
    private final static String INPUT_PATH = "hdfs://master:8020/input";
    private final static String OUTPUT_PATH = "hdfs://master:8020/output.txt";
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private IntWritable one = new IntWritable(1);//1
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] str = value.toString().split("\\s+");

            for (String string : str) {
                System.out.println(string);
                word.set(string);
                context.write(word, one);
            }
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum+=val.get();
            }
            result.set(sum);
            context.write(key,result);
        }   
    }

    public static void main(String[] args) throws Exception {
        //1、配置  
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(CombinerExp.class);

        //3、输入路径
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  
        //4、Map
        job.setMapperClass(MyMapper.class);

        //5、Combiner
        job.setCombinerClass(MyReducer.class);

        //6、Reducer
        //job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(0);//reduce个数默认是1

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }  
}
[root@master liguodong]# hdfs dfs -ls -R /input/
-rw-r--r--   1 root supergroup         27 2015-06-13 22:15 /input/input1
-rw-r--r--   1 root supergroup         38 2015-06-13 22:15 /input/input2

当我们只有map和combine而没有reduce时,combine并不会执行。
而输出的结果并没有被求和。
[root@master liguodong]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:17 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         50 2015-06-13 22:17 /output/part-m-00000
-rw-r--r--   3 liguodong supergroup         39 2015-06-13 22:17 /output/part-m-00001


[root@master liguodong]# hdfs dfs -cat /output/part-m-00000
hello   1
you     1
hello   1
everyone        1
hello   1
hadoop  1
[root@master liguodong]# hdfs dfs -cat /output/part-m-00001
hello   1
you     1
hello   1
me      1
hi      1
baby    1

当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 32
    File System Counters
        ......
    Map-Reduce Framework
        Map input records=6
        Map output records=12
        ......
        Input split bytes=192
        Combine input records=12
        Combine output records=9
        ......
        Reduce input records=9
        Reduce output records=7
        Spilled Records=18
        ......
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=457912320
    File Input Format Counters 
        Bytes Read=65
    File Output Format Counters 
        Bytes Written=51

[root@master hadoop]# hdfs dfs -ls -R /output/
-rw-r--r--   3 liguodong supergroup          0 2015-06-13 22:41 /output/_SUCCESS
-rw-r--r--   3 liguodong supergroup         51 2015-06-13 22:41 /output/part-r-00000

[root@master hadoop]# hdfs dfs -cat /output/pa*
baby    1
everyone        1
hadoop  1
hello   5
hi      1
me      1
you     2

MapReduce之Combiner组件

标签:combiner   mapreduce   

原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46483455

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!